diff --git a/pkg/capabilities/base_trigger.go b/pkg/capabilities/base_trigger.go index 1851a4e06a..57f75fe1c2 100644 --- a/pkg/capabilities/base_trigger.go +++ b/pkg/capabilities/base_trigger.go @@ -343,12 +343,27 @@ func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) { Id: event.EventId, } + if !safeSend(sendCh, wrapped) { + b.metrics.IncInboxFull(event.TriggerId) + b.lggr.Warnf("inbox full or closed for trigger %s", event.TriggerId) + return + } + + b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d", + b.capabilityId, event.TriggerId, event.EventId, attempts) +} + +func safeSend[T any](ch chan<- T, val T) (sent bool) { + defer func() { + if recover() != nil { + sent = false + } + }() + select { - case sendCh <- wrapped: - b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d", - b.capabilityId, event.TriggerId, event.EventId, attempts) + case ch <- val: + return true default: - b.metrics.IncInboxFull(event.TriggerId) - b.lggr.Warnf("inbox full for trigger %s", event.TriggerId) + return false } } diff --git a/pkg/capabilities/pb/capabilities.pb.go b/pkg/capabilities/pb/capabilities.pb.go index ca25029d69..af3460f34f 100644 --- a/pkg/capabilities/pb/capabilities.pb.go +++ b/pkg/capabilities/pb/capabilities.pb.go @@ -1096,21 +1096,22 @@ func (x *UnregisterFromWorkflowRequest) GetConfig() *pb.Map { } type InitialiseRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Config string `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` - ErrorLogId uint32 `protobuf:"varint,2,opt,name=error_log_id,json=errorLogId,proto3" json:"error_log_id,omitempty"` - PipelineRunnerId uint32 `protobuf:"varint,3,opt,name=pipeline_runner_id,json=pipelineRunnerId,proto3" json:"pipeline_runner_id,omitempty"` - TelemetryId uint32 `protobuf:"varint,4,opt,name=telemetry_id,json=telemetryId,proto3" json:"telemetry_id,omitempty"` - CapRegistryId uint32 `protobuf:"varint,5,opt,name=capRegistry_id,json=capRegistryId,proto3" json:"capRegistry_id,omitempty"` - KeyValueStoreId uint32 `protobuf:"varint,6,opt,name=keyValueStore_id,json=keyValueStoreId,proto3" json:"keyValueStore_id,omitempty"` - RelayerSetId uint32 `protobuf:"varint,7,opt,name=relayer_set_id,json=relayerSetId,proto3" json:"relayer_set_id,omitempty"` - OracleFactoryId uint32 `protobuf:"varint,8,opt,name=oracle_factory_id,json=oracleFactoryId,proto3" json:"oracle_factory_id,omitempty"` - GatewayConnectorId uint32 `protobuf:"varint,9,opt,name=gateway_connector_id,json=gatewayConnectorId,proto3" json:"gateway_connector_id,omitempty"` - KeystoreId uint32 `protobuf:"varint,10,opt,name=keystore_id,json=keystoreId,proto3" json:"keystore_id,omitempty"` - OrgResolverId uint32 `protobuf:"varint,11,opt,name=org_resolver_id,json=orgResolverId,proto3" json:"org_resolver_id,omitempty"` - CreSettingsId uint32 `protobuf:"varint,12,opt,name=cre_settings_id,json=creSettingsId,proto3" json:"cre_settings_id,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Config string `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` + ErrorLogId uint32 `protobuf:"varint,2,opt,name=error_log_id,json=errorLogId,proto3" json:"error_log_id,omitempty"` + PipelineRunnerId uint32 `protobuf:"varint,3,opt,name=pipeline_runner_id,json=pipelineRunnerId,proto3" json:"pipeline_runner_id,omitempty"` + TelemetryId uint32 `protobuf:"varint,4,opt,name=telemetry_id,json=telemetryId,proto3" json:"telemetry_id,omitempty"` + CapRegistryId uint32 `protobuf:"varint,5,opt,name=capRegistry_id,json=capRegistryId,proto3" json:"capRegistry_id,omitempty"` + KeyValueStoreId uint32 `protobuf:"varint,6,opt,name=keyValueStore_id,json=keyValueStoreId,proto3" json:"keyValueStore_id,omitempty"` + RelayerSetId uint32 `protobuf:"varint,7,opt,name=relayer_set_id,json=relayerSetId,proto3" json:"relayer_set_id,omitempty"` + OracleFactoryId uint32 `protobuf:"varint,8,opt,name=oracle_factory_id,json=oracleFactoryId,proto3" json:"oracle_factory_id,omitempty"` + GatewayConnectorId uint32 `protobuf:"varint,9,opt,name=gateway_connector_id,json=gatewayConnectorId,proto3" json:"gateway_connector_id,omitempty"` + KeystoreId uint32 `protobuf:"varint,10,opt,name=keystore_id,json=keystoreId,proto3" json:"keystore_id,omitempty"` + OrgResolverId uint32 `protobuf:"varint,11,opt,name=org_resolver_id,json=orgResolverId,proto3" json:"org_resolver_id,omitempty"` + CreSettingsId uint32 `protobuf:"varint,12,opt,name=cre_settings_id,json=creSettingsId,proto3" json:"cre_settings_id,omitempty"` + TriggerEventStoreId uint32 `protobuf:"varint,13,opt,name=trigger_event_store_id,json=triggerEventStoreId,proto3" json:"trigger_event_store_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *InitialiseRequest) Reset() { @@ -1227,6 +1228,13 @@ func (x *InitialiseRequest) GetCreSettingsId() uint32 { return 0 } +func (x *InitialiseRequest) GetTriggerEventStoreId() uint32 { + if x != nil { + return x.TriggerEventStoreId + } + return 0 +} + type CapabilityInfosReply struct { state protoimpl.MessageState `protogen:"open.v1"` Infos []*CapabilityInfoReply `protobuf:"bytes,1,rep,name=infos,proto3" json:"infos,omitempty"` @@ -1406,7 +1414,7 @@ const file_capabilities_proto_rawDesc = "" + "\x06config\x18\x02 \x01(\v2\x0e.values.v1.MapR\x06config\"\x87\x01\n" + "\x1dUnregisterFromWorkflowRequest\x12>\n" + "\bmetadata\x18\x01 \x01(\v2\".capabilities.RegistrationMetadataR\bmetadata\x12&\n" + - "\x06config\x18\x02 \x01(\v2\x0e.values.v1.MapR\x06config\"\xe5\x03\n" + + "\x06config\x18\x02 \x01(\v2\x0e.values.v1.MapR\x06config\"\x9a\x04\n" + "\x11InitialiseRequest\x12\x16\n" + "\x06config\x18\x01 \x01(\tR\x06config\x12 \n" + "\ferror_log_id\x18\x02 \x01(\rR\n" + @@ -1422,7 +1430,8 @@ const file_capabilities_proto_rawDesc = "" + " \x01(\rR\n" + "keystoreId\x12&\n" + "\x0forg_resolver_id\x18\v \x01(\rR\rorgResolverId\x12&\n" + - "\x0fcre_settings_id\x18\f \x01(\rR\rcreSettingsId\"O\n" + + "\x0fcre_settings_id\x18\f \x01(\rR\rcreSettingsId\x123\n" + + "\x16trigger_event_store_id\x18\r \x01(\rR\x13triggerEventStoreId\"O\n" + "\x14CapabilityInfosReply\x127\n" + "\x05infos\x18\x01 \x03(\v2!.capabilities.CapabilityInfoReplyR\x05infos\"@\n" + "\x0eSettingsUpdate\x12\x1a\n" + diff --git a/pkg/capabilities/pb/capabilities.proto b/pkg/capabilities/pb/capabilities.proto index 19698cc9ef..ee55b6764c 100644 --- a/pkg/capabilities/pb/capabilities.proto +++ b/pkg/capabilities/pb/capabilities.proto @@ -182,6 +182,7 @@ message InitialiseRequest { uint32 keystore_id = 10; uint32 org_resolver_id = 11; uint32 cre_settings_id = 12; + uint32 trigger_event_store_id = 13; } message CapabilityInfosReply { diff --git a/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go b/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go index 255e90b34a..62ff468700 100644 --- a/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go +++ b/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/capability" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/errorlog" + "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/eventstore" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/gateway" keystoreservice "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keystore" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keyvalue" @@ -196,19 +197,34 @@ func (c *StandardCapabilitiesClient) Initialise(ctx context.Context, dependencie resources = append(resources, creSettingsRes) } + triggerEventStore := dependencies.TriggerEventStore + var triggerEventStoreID uint32 + if triggerEventStore != nil { + var triggerEventStoreRes net.Resource + triggerEventStoreID, triggerEventStoreRes, err = c.ServeNew("TriggerEventStore", func(s *grpc.Server) { + pb.RegisterEventStoreServer(s, eventstore.NewServer(triggerEventStore)) + }) + if err != nil { + c.CloseAll(resources...) + return fmt.Errorf("failed to serve trigger event store: %w", err) + } + resources = append(resources, triggerEventStoreRes) + } + _, err = c.StandardCapabilitiesClient.Initialise(ctx, &capabilitiespb.InitialiseRequest{ - Config: config, - ErrorLogId: errorLogID, - PipelineRunnerId: pipelineRunnerID, - TelemetryId: telemetryID, - CapRegistryId: capabilitiesRegistryID, - KeyValueStoreId: keyValueStoreID, - RelayerSetId: relayerSetID, - OracleFactoryId: oracleFactoryID, - GatewayConnectorId: gatewayConnectorID, - KeystoreId: keyStoreID, - OrgResolverId: orgResolverID, - CreSettingsId: creSettingsID, + Config: config, + ErrorLogId: errorLogID, + PipelineRunnerId: pipelineRunnerID, + TelemetryId: telemetryID, + CapRegistryId: capabilitiesRegistryID, + KeyValueStoreId: keyValueStoreID, + RelayerSetId: relayerSetID, + OracleFactoryId: oracleFactoryID, + GatewayConnectorId: gatewayConnectorID, + KeystoreId: keyStoreID, + OrgResolverId: orgResolverID, + CreSettingsId: creSettingsID, + TriggerEventStoreId: triggerEventStoreID, }) if err != nil { @@ -375,6 +391,17 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca creSettings = settings.NewClient(s.Logger, creSettingsConn) } + var triggerEventStoreClient capabilities.EventStore + if request.TriggerEventStoreId > 0 { + triggerEventStoreConn, err := s.Dial(request.TriggerEventStoreId) + if err != nil { + s.CloseAll(resources...) + return nil, net.ErrConnDial{Name: "TriggerEventStore", ID: request.TriggerEventStoreId, Err: err} + } + resources = append(resources, net.Resource{Closer: triggerEventStoreConn, Name: "TriggerEventStore"}) + triggerEventStoreClient = eventstore.NewClient(triggerEventStoreConn) + } + dependencies := core.StandardCapabilitiesDependencies{ Config: request.Config, TelemetryService: telemetry, @@ -388,6 +415,7 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca P2PKeystore: keyStore, OrgResolver: orgResolver, CRESettings: creSettings, + TriggerEventStore: triggerEventStoreClient, } if err = s.impl.Initialise(ctx, dependencies); err != nil { diff --git a/pkg/loop/internal/core/services/eventstore/client.go b/pkg/loop/internal/core/services/eventstore/client.go new file mode 100644 index 0000000000..7a27a9ee43 --- /dev/null +++ b/pkg/loop/internal/core/services/eventstore/client.go @@ -0,0 +1,87 @@ +package eventstore + +import ( + "context" + "time" + + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb" +) + +var _ capabilities.EventStore = (*Client)(nil) + +type Client struct { + grpc pb.EventStoreClient +} + +func NewClient(cc grpc.ClientConnInterface) *Client { + return &Client{grpc: pb.NewEventStoreClient(cc)} +} + +func (c *Client) Insert(ctx context.Context, rec capabilities.PendingEvent) error { + ev := &pb.PendingEventProto{ + TriggerId: rec.TriggerId, + EventId: rec.EventId, + Payload: rec.Payload, + FirstAt: timestamppb.New(rec.FirstAt), + Attempts: int32(rec.Attempts), + } + if !rec.LastSentAt.IsZero() { + ev.LastSentAt = timestamppb.New(rec.LastSentAt) + } + _, err := c.grpc.Insert(ctx, &pb.InsertEventRequest{Event: ev}) + return err +} + +func (c *Client) UpdateDelivery(ctx context.Context, triggerId string, eventId string, lastSentAt time.Time, attempts int) error { + _, err := c.grpc.UpdateDelivery(ctx, &pb.UpdateDeliveryRequest{ + TriggerId: triggerId, + EventId: eventId, + LastSentAt: timestamppb.New(lastSentAt), + Attempts: int32(attempts), + }) + return err +} + +func (c *Client) List(ctx context.Context) ([]capabilities.PendingEvent, error) { + resp, err := c.grpc.List(ctx, &emptypb.Empty{}) + if err != nil { + return nil, err + } + events := make([]capabilities.PendingEvent, 0, len(resp.GetEvents())) + for _, ev := range resp.GetEvents() { + rec := capabilities.PendingEvent{ + TriggerId: ev.GetTriggerId(), + EventId: ev.GetEventId(), + Payload: ev.GetPayload(), + Attempts: int(ev.GetAttempts()), + } + if t := ev.GetFirstAt(); t != nil { + rec.FirstAt = t.AsTime() + } + if t := ev.GetLastSentAt(); t != nil { + rec.LastSentAt = t.AsTime() + } + events = append(events, rec) + } + return events, nil +} + +func (c *Client) DeleteEvent(ctx context.Context, triggerId string, eventId string) error { + _, err := c.grpc.DeleteEvent(ctx, &pb.DeleteEventRequest{ + TriggerId: triggerId, + EventId: eventId, + }) + return err +} + +func (c *Client) DeleteEventsForTrigger(ctx context.Context, triggerID string) error { + _, err := c.grpc.DeleteEventsForTrigger(ctx, &pb.DeleteEventsForTriggerRequest{ + TriggerId: triggerID, + }) + return err +} diff --git a/pkg/loop/internal/core/services/eventstore/event_store_test.go b/pkg/loop/internal/core/services/eventstore/event_store_test.go new file mode 100644 index 0000000000..f2a31d048b --- /dev/null +++ b/pkg/loop/internal/core/services/eventstore/event_store_test.go @@ -0,0 +1,393 @@ +package eventstore + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb" +) + +var ( + testTime1 = time.Date(2025, 1, 15, 10, 30, 0, 0, time.UTC) + testTime2 = time.Date(2025, 1, 15, 11, 0, 0, 0, time.UTC) +) + +func TestClient_InsertAndList(t *testing.T) { + ctx := t.Context() + client := Client{grpc: newTestGRPCClient()} + + ev := capabilities.PendingEvent{ + TriggerId: "trigger-1", + EventId: "event-1", + Payload: []byte("payload-data"), + FirstAt: testTime1, + Attempts: 0, + } + + err := client.Insert(ctx, ev) + require.NoError(t, err) + + events, err := client.List(ctx) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, "trigger-1", events[0].TriggerId) + assert.Equal(t, "event-1", events[0].EventId) + assert.Equal(t, []byte("payload-data"), events[0].Payload) + assert.Equal(t, testTime1, events[0].FirstAt) + assert.True(t, events[0].LastSentAt.IsZero()) +} + +func TestClient_InsertWithLastSentAt(t *testing.T) { + ctx := t.Context() + mock := newTestGRPCClient() + client := Client{grpc: mock} + + ev := capabilities.PendingEvent{ + TriggerId: "trigger-1", + EventId: "event-1", + Payload: []byte("data"), + FirstAt: testTime1, + LastSentAt: testTime2, + Attempts: 3, + } + + require.NoError(t, client.Insert(ctx, ev)) + + events, err := client.List(ctx) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, testTime2, events[0].LastSentAt) + assert.Equal(t, 3, events[0].Attempts) +} + +func TestClient_UpdateDelivery(t *testing.T) { + ctx := t.Context() + mock := newTestGRPCClient() + client := Client{grpc: mock} + + require.NoError(t, client.Insert(ctx, capabilities.PendingEvent{ + TriggerId: "trigger-1", + EventId: "event-1", + Payload: []byte("data"), + FirstAt: testTime1, + })) + + err := client.UpdateDelivery(ctx, "trigger-1", "event-1", testTime2, 5) + require.NoError(t, err) + + events, err := client.List(ctx) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, testTime2, events[0].LastSentAt) + assert.Equal(t, 5, events[0].Attempts) +} + +func TestClient_DeleteEvent(t *testing.T) { + ctx := t.Context() + client := Client{grpc: newTestGRPCClient()} + + require.NoError(t, client.Insert(ctx, capabilities.PendingEvent{ + TriggerId: "trigger-1", EventId: "event-1", Payload: []byte("a"), FirstAt: testTime1, + })) + require.NoError(t, client.Insert(ctx, capabilities.PendingEvent{ + TriggerId: "trigger-1", EventId: "event-2", Payload: []byte("b"), FirstAt: testTime1, + })) + + err := client.DeleteEvent(ctx, "trigger-1", "event-1") + require.NoError(t, err) + + events, err := client.List(ctx) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, "event-2", events[0].EventId) +} + +func TestClient_DeleteEventsForTrigger(t *testing.T) { + ctx := t.Context() + client := Client{grpc: newTestGRPCClient()} + + require.NoError(t, client.Insert(ctx, capabilities.PendingEvent{ + TriggerId: "trigger-1", EventId: "event-1", Payload: []byte("a"), FirstAt: testTime1, + })) + require.NoError(t, client.Insert(ctx, capabilities.PendingEvent{ + TriggerId: "trigger-1", EventId: "event-2", Payload: []byte("b"), FirstAt: testTime1, + })) + require.NoError(t, client.Insert(ctx, capabilities.PendingEvent{ + TriggerId: "trigger-2", EventId: "event-3", Payload: []byte("c"), FirstAt: testTime1, + })) + + err := client.DeleteEventsForTrigger(ctx, "trigger-1") + require.NoError(t, err) + + events, err := client.List(ctx) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, "trigger-2", events[0].TriggerId) +} + +func TestServer_InsertAndList(t *testing.T) { + ctx := t.Context() + server := Server{impl: capabilities.NewMemEventStore()} + + _, err := server.Insert(ctx, &pb.InsertEventRequest{ + Event: &pb.PendingEventProto{ + TriggerId: "trigger-1", + EventId: "event-1", + Payload: []byte("payload-data"), + FirstAt: timestamppb.New(testTime1), + Attempts: 0, + }, + }) + require.NoError(t, err) + + resp, err := server.List(ctx, &emptypb.Empty{}) + require.NoError(t, err) + require.Len(t, resp.Events, 1) + assert.Equal(t, "trigger-1", resp.Events[0].TriggerId) + assert.Equal(t, "event-1", resp.Events[0].EventId) + assert.Equal(t, []byte("payload-data"), resp.Events[0].Payload) + assert.Equal(t, testTime1, resp.Events[0].FirstAt.AsTime()) +} + +func TestServer_UpdateDelivery(t *testing.T) { + ctx := t.Context() + server := Server{impl: capabilities.NewMemEventStore()} + + _, err := server.Insert(ctx, &pb.InsertEventRequest{ + Event: &pb.PendingEventProto{ + TriggerId: "trigger-1", + EventId: "event-1", + Payload: []byte("data"), + FirstAt: timestamppb.New(testTime1), + }, + }) + require.NoError(t, err) + + _, err = server.UpdateDelivery(ctx, &pb.UpdateDeliveryRequest{ + TriggerId: "trigger-1", + EventId: "event-1", + LastSentAt: timestamppb.New(testTime2), + Attempts: 3, + }) + require.NoError(t, err) + + resp, err := server.List(ctx, &emptypb.Empty{}) + require.NoError(t, err) + require.Len(t, resp.Events, 1) + assert.Equal(t, testTime2, resp.Events[0].LastSentAt.AsTime()) + assert.Equal(t, int32(3), resp.Events[0].Attempts) +} + +func TestServer_DeleteEvent(t *testing.T) { + ctx := t.Context() + server := Server{impl: capabilities.NewMemEventStore()} + + _, err := server.Insert(ctx, &pb.InsertEventRequest{ + Event: &pb.PendingEventProto{ + TriggerId: "trigger-1", EventId: "event-1", Payload: []byte("a"), FirstAt: timestamppb.New(testTime1), + }, + }) + require.NoError(t, err) + _, err = server.Insert(ctx, &pb.InsertEventRequest{ + Event: &pb.PendingEventProto{ + TriggerId: "trigger-1", EventId: "event-2", Payload: []byte("b"), FirstAt: timestamppb.New(testTime1), + }, + }) + require.NoError(t, err) + + _, err = server.DeleteEvent(ctx, &pb.DeleteEventRequest{TriggerId: "trigger-1", EventId: "event-1"}) + require.NoError(t, err) + + resp, err := server.List(ctx, &emptypb.Empty{}) + require.NoError(t, err) + require.Len(t, resp.Events, 1) + assert.Equal(t, "event-2", resp.Events[0].EventId) +} + +func TestServer_DeleteEventsForTrigger(t *testing.T) { + ctx := t.Context() + server := Server{impl: capabilities.NewMemEventStore()} + + for _, ev := range []struct{ tid, eid string }{ + {"trigger-1", "event-1"}, + {"trigger-1", "event-2"}, + {"trigger-2", "event-3"}, + } { + _, err := server.Insert(ctx, &pb.InsertEventRequest{ + Event: &pb.PendingEventProto{ + TriggerId: ev.tid, EventId: ev.eid, Payload: []byte("x"), FirstAt: timestamppb.New(testTime1), + }, + }) + require.NoError(t, err) + } + + _, err := server.DeleteEventsForTrigger(ctx, &pb.DeleteEventsForTriggerRequest{TriggerId: "trigger-1"}) + require.NoError(t, err) + + resp, err := server.List(ctx, &emptypb.Empty{}) + require.NoError(t, err) + require.Len(t, resp.Events, 1) + assert.Equal(t, "trigger-2", resp.Events[0].TriggerId) +} + +func TestServer_NilImpl(t *testing.T) { + ctx := t.Context() + server := Server{} + + _, err := server.Insert(ctx, &pb.InsertEventRequest{Event: &pb.PendingEventProto{}}) + assert.Error(t, err) + + _, err = server.UpdateDelivery(ctx, &pb.UpdateDeliveryRequest{}) + assert.Error(t, err) + + _, err = server.List(ctx, &emptypb.Empty{}) + assert.Error(t, err) + + _, err = server.DeleteEvent(ctx, &pb.DeleteEventRequest{}) + assert.Error(t, err) + + _, err = server.DeleteEventsForTrigger(ctx, &pb.DeleteEventsForTriggerRequest{}) + assert.Error(t, err) +} + +func TestRoundTrip_ClientThroughServer(t *testing.T) { + ctx := t.Context() + store := capabilities.NewMemEventStore() + server := NewServer(store) + client := Client{grpc: &serverBackedGRPCClient{server: server}} + + ev := capabilities.PendingEvent{ + TriggerId: "trigger-rt", + EventId: "event-rt", + Payload: []byte("round-trip-payload"), + FirstAt: testTime1, + LastSentAt: testTime2, + Attempts: 2, + } + + require.NoError(t, client.Insert(ctx, ev)) + + events, err := client.List(ctx) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, ev.TriggerId, events[0].TriggerId) + assert.Equal(t, ev.EventId, events[0].EventId) + assert.Equal(t, ev.Payload, events[0].Payload) + assert.Equal(t, ev.FirstAt, events[0].FirstAt) + assert.Equal(t, ev.LastSentAt, events[0].LastSentAt) + assert.Equal(t, ev.Attempts, events[0].Attempts) + + newTime := testTime2.Add(10 * time.Minute) + require.NoError(t, client.UpdateDelivery(ctx, "trigger-rt", "event-rt", newTime, 3)) + + events, err = client.List(ctx) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, newTime, events[0].LastSentAt) + assert.Equal(t, 3, events[0].Attempts) + + require.NoError(t, client.DeleteEvent(ctx, "trigger-rt", "event-rt")) + + events, err = client.List(ctx) + require.NoError(t, err) + assert.Empty(t, events) +} + +// testGRPCClient is a mock pb.EventStoreClient backed by an in-memory store. +type testGRPCClient struct { + mu sync.Mutex + events map[string]map[string]*pb.PendingEventProto // triggerID -> eventID -> event +} + +func newTestGRPCClient() *testGRPCClient { + return &testGRPCClient{events: make(map[string]map[string]*pb.PendingEventProto)} +} + +func (t *testGRPCClient) Insert(_ context.Context, in *pb.InsertEventRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + t.mu.Lock() + defer t.mu.Unlock() + ev := in.GetEvent() + if t.events[ev.TriggerId] == nil { + t.events[ev.TriggerId] = make(map[string]*pb.PendingEventProto) + } + t.events[ev.TriggerId][ev.EventId] = ev + return &emptypb.Empty{}, nil +} + +func (t *testGRPCClient) UpdateDelivery(_ context.Context, in *pb.UpdateDeliveryRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + t.mu.Lock() + defer t.mu.Unlock() + if trigger, ok := t.events[in.TriggerId]; ok { + if ev, ok := trigger[in.EventId]; ok { + ev.LastSentAt = in.LastSentAt + ev.Attempts = in.Attempts + } + } + return &emptypb.Empty{}, nil +} + +func (t *testGRPCClient) List(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ListEventsResponse, error) { + t.mu.Lock() + defer t.mu.Unlock() + var out []*pb.PendingEventProto + for _, trigger := range t.events { + for _, ev := range trigger { + out = append(out, ev) + } + } + return &pb.ListEventsResponse{Events: out}, nil +} + +func (t *testGRPCClient) DeleteEvent(_ context.Context, in *pb.DeleteEventRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + t.mu.Lock() + defer t.mu.Unlock() + if trigger, ok := t.events[in.TriggerId]; ok { + delete(trigger, in.EventId) + if len(trigger) == 0 { + delete(t.events, in.TriggerId) + } + } + return &emptypb.Empty{}, nil +} + +func (t *testGRPCClient) DeleteEventsForTrigger(_ context.Context, in *pb.DeleteEventsForTriggerRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + t.mu.Lock() + defer t.mu.Unlock() + delete(t.events, in.TriggerId) + return &emptypb.Empty{}, nil +} + +// serverBackedGRPCClient implements pb.EventStoreClient by calling the Server directly, +// simulating the full serialization round-trip without a real gRPC connection. +type serverBackedGRPCClient struct { + server *Server +} + +func (s *serverBackedGRPCClient) Insert(ctx context.Context, in *pb.InsertEventRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + return s.server.Insert(ctx, in) +} + +func (s *serverBackedGRPCClient) UpdateDelivery(ctx context.Context, in *pb.UpdateDeliveryRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + return s.server.UpdateDelivery(ctx, in) +} + +func (s *serverBackedGRPCClient) List(ctx context.Context, in *emptypb.Empty, _ ...grpc.CallOption) (*pb.ListEventsResponse, error) { + return s.server.List(ctx, in) +} + +func (s *serverBackedGRPCClient) DeleteEvent(ctx context.Context, in *pb.DeleteEventRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + return s.server.DeleteEvent(ctx, in) +} + +func (s *serverBackedGRPCClient) DeleteEventsForTrigger(ctx context.Context, in *pb.DeleteEventsForTriggerRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + return s.server.DeleteEventsForTrigger(ctx, in) +} diff --git a/pkg/loop/internal/core/services/eventstore/server.go b/pkg/loop/internal/core/services/eventstore/server.go new file mode 100644 index 0000000000..73f4b55390 --- /dev/null +++ b/pkg/loop/internal/core/services/eventstore/server.go @@ -0,0 +1,104 @@ +package eventstore + +import ( + "context" + "fmt" + + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb" +) + +var _ pb.EventStoreServer = (*Server)(nil) + +type Server struct { + pb.UnimplementedEventStoreServer + impl capabilities.EventStore +} + +func NewServer(impl capabilities.EventStore) *Server { + return &Server{impl: impl} +} + +func (s *Server) Insert(ctx context.Context, req *pb.InsertEventRequest) (*emptypb.Empty, error) { + if s.impl == nil { + return nil, fmt.Errorf("event store implementation is nil") + } + ev := req.GetEvent() + rec := capabilities.PendingEvent{ + TriggerId: ev.GetTriggerId(), + EventId: ev.GetEventId(), + Payload: ev.GetPayload(), + Attempts: int(ev.GetAttempts()), + } + if t := ev.GetFirstAt(); t != nil { + rec.FirstAt = t.AsTime() + } + if t := ev.GetLastSentAt(); t != nil { + rec.LastSentAt = t.AsTime() + } + if err := s.impl.Insert(ctx, rec); err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} + +func (s *Server) UpdateDelivery(ctx context.Context, req *pb.UpdateDeliveryRequest) (*emptypb.Empty, error) { + if s.impl == nil { + return nil, fmt.Errorf("event store implementation is nil") + } + var lastSentAt = req.GetLastSentAt().AsTime() + if err := s.impl.UpdateDelivery(ctx, req.GetTriggerId(), req.GetEventId(), lastSentAt, int(req.GetAttempts())); err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} + +func (s *Server) List(ctx context.Context, _ *emptypb.Empty) (*pb.ListEventsResponse, error) { + if s.impl == nil { + return nil, fmt.Errorf("event store implementation is nil") + } + events, err := s.impl.List(ctx) + if err != nil { + return nil, err + } + resp := &pb.ListEventsResponse{ + Events: make([]*pb.PendingEventProto, 0, len(events)), + } + for _, ev := range events { + pev := &pb.PendingEventProto{ + TriggerId: ev.TriggerId, + EventId: ev.EventId, + Payload: ev.Payload, + FirstAt: timestamppb.New(ev.FirstAt), + Attempts: int32(ev.Attempts), + } + if !ev.LastSentAt.IsZero() { + pev.LastSentAt = timestamppb.New(ev.LastSentAt) + } + resp.Events = append(resp.Events, pev) + } + return resp, nil +} + +func (s *Server) DeleteEvent(ctx context.Context, req *pb.DeleteEventRequest) (*emptypb.Empty, error) { + if s.impl == nil { + return nil, fmt.Errorf("event store implementation is nil") + } + if err := s.impl.DeleteEvent(ctx, req.GetTriggerId(), req.GetEventId()); err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} + +func (s *Server) DeleteEventsForTrigger(ctx context.Context, req *pb.DeleteEventsForTriggerRequest) (*emptypb.Empty, error) { + if s.impl == nil { + return nil, fmt.Errorf("event store implementation is nil") + } + if err := s.impl.DeleteEventsForTrigger(ctx, req.GetTriggerId()); err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} diff --git a/pkg/loop/internal/pb/event_store.pb.go b/pkg/loop/internal/pb/event_store.pb.go new file mode 100644 index 0000000000..9f755eceea --- /dev/null +++ b/pkg/loop/internal/pb/event_store.pb.go @@ -0,0 +1,470 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v5.29.3 +// source: event_store.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PendingEventProto struct { + state protoimpl.MessageState `protogen:"open.v1"` + TriggerId string `protobuf:"bytes,1,opt,name=trigger_id,json=triggerId,proto3" json:"trigger_id,omitempty"` + EventId string `protobuf:"bytes,2,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` + FirstAt *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=first_at,json=firstAt,proto3" json:"first_at,omitempty"` + LastSentAt *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=last_sent_at,json=lastSentAt,proto3" json:"last_sent_at,omitempty"` + Attempts int32 `protobuf:"varint,6,opt,name=attempts,proto3" json:"attempts,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PendingEventProto) Reset() { + *x = PendingEventProto{} + mi := &file_event_store_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PendingEventProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PendingEventProto) ProtoMessage() {} + +func (x *PendingEventProto) ProtoReflect() protoreflect.Message { + mi := &file_event_store_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PendingEventProto.ProtoReflect.Descriptor instead. +func (*PendingEventProto) Descriptor() ([]byte, []int) { + return file_event_store_proto_rawDescGZIP(), []int{0} +} + +func (x *PendingEventProto) GetTriggerId() string { + if x != nil { + return x.TriggerId + } + return "" +} + +func (x *PendingEventProto) GetEventId() string { + if x != nil { + return x.EventId + } + return "" +} + +func (x *PendingEventProto) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *PendingEventProto) GetFirstAt() *timestamppb.Timestamp { + if x != nil { + return x.FirstAt + } + return nil +} + +func (x *PendingEventProto) GetLastSentAt() *timestamppb.Timestamp { + if x != nil { + return x.LastSentAt + } + return nil +} + +func (x *PendingEventProto) GetAttempts() int32 { + if x != nil { + return x.Attempts + } + return 0 +} + +type InsertEventRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Event *PendingEventProto `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *InsertEventRequest) Reset() { + *x = InsertEventRequest{} + mi := &file_event_store_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *InsertEventRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InsertEventRequest) ProtoMessage() {} + +func (x *InsertEventRequest) ProtoReflect() protoreflect.Message { + mi := &file_event_store_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InsertEventRequest.ProtoReflect.Descriptor instead. +func (*InsertEventRequest) Descriptor() ([]byte, []int) { + return file_event_store_proto_rawDescGZIP(), []int{1} +} + +func (x *InsertEventRequest) GetEvent() *PendingEventProto { + if x != nil { + return x.Event + } + return nil +} + +type UpdateDeliveryRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + TriggerId string `protobuf:"bytes,1,opt,name=trigger_id,json=triggerId,proto3" json:"trigger_id,omitempty"` + EventId string `protobuf:"bytes,2,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + LastSentAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=last_sent_at,json=lastSentAt,proto3" json:"last_sent_at,omitempty"` + Attempts int32 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UpdateDeliveryRequest) Reset() { + *x = UpdateDeliveryRequest{} + mi := &file_event_store_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UpdateDeliveryRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateDeliveryRequest) ProtoMessage() {} + +func (x *UpdateDeliveryRequest) ProtoReflect() protoreflect.Message { + mi := &file_event_store_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateDeliveryRequest.ProtoReflect.Descriptor instead. +func (*UpdateDeliveryRequest) Descriptor() ([]byte, []int) { + return file_event_store_proto_rawDescGZIP(), []int{2} +} + +func (x *UpdateDeliveryRequest) GetTriggerId() string { + if x != nil { + return x.TriggerId + } + return "" +} + +func (x *UpdateDeliveryRequest) GetEventId() string { + if x != nil { + return x.EventId + } + return "" +} + +func (x *UpdateDeliveryRequest) GetLastSentAt() *timestamppb.Timestamp { + if x != nil { + return x.LastSentAt + } + return nil +} + +func (x *UpdateDeliveryRequest) GetAttempts() int32 { + if x != nil { + return x.Attempts + } + return 0 +} + +type ListEventsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Events []*PendingEventProto `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListEventsResponse) Reset() { + *x = ListEventsResponse{} + mi := &file_event_store_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListEventsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListEventsResponse) ProtoMessage() {} + +func (x *ListEventsResponse) ProtoReflect() protoreflect.Message { + mi := &file_event_store_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListEventsResponse.ProtoReflect.Descriptor instead. +func (*ListEventsResponse) Descriptor() ([]byte, []int) { + return file_event_store_proto_rawDescGZIP(), []int{3} +} + +func (x *ListEventsResponse) GetEvents() []*PendingEventProto { + if x != nil { + return x.Events + } + return nil +} + +type DeleteEventRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + TriggerId string `protobuf:"bytes,1,opt,name=trigger_id,json=triggerId,proto3" json:"trigger_id,omitempty"` + EventId string `protobuf:"bytes,2,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DeleteEventRequest) Reset() { + *x = DeleteEventRequest{} + mi := &file_event_store_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DeleteEventRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteEventRequest) ProtoMessage() {} + +func (x *DeleteEventRequest) ProtoReflect() protoreflect.Message { + mi := &file_event_store_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteEventRequest.ProtoReflect.Descriptor instead. +func (*DeleteEventRequest) Descriptor() ([]byte, []int) { + return file_event_store_proto_rawDescGZIP(), []int{4} +} + +func (x *DeleteEventRequest) GetTriggerId() string { + if x != nil { + return x.TriggerId + } + return "" +} + +func (x *DeleteEventRequest) GetEventId() string { + if x != nil { + return x.EventId + } + return "" +} + +type DeleteEventsForTriggerRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + TriggerId string `protobuf:"bytes,1,opt,name=trigger_id,json=triggerId,proto3" json:"trigger_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DeleteEventsForTriggerRequest) Reset() { + *x = DeleteEventsForTriggerRequest{} + mi := &file_event_store_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DeleteEventsForTriggerRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteEventsForTriggerRequest) ProtoMessage() {} + +func (x *DeleteEventsForTriggerRequest) ProtoReflect() protoreflect.Message { + mi := &file_event_store_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteEventsForTriggerRequest.ProtoReflect.Descriptor instead. +func (*DeleteEventsForTriggerRequest) Descriptor() ([]byte, []int) { + return file_event_store_proto_rawDescGZIP(), []int{5} +} + +func (x *DeleteEventsForTriggerRequest) GetTriggerId() string { + if x != nil { + return x.TriggerId + } + return "" +} + +var File_event_store_proto protoreflect.FileDescriptor + +const file_event_store_proto_rawDesc = "" + + "\n" + + "\x11event_store.proto\x12\x04loop\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xf8\x01\n" + + "\x11PendingEventProto\x12\x1d\n" + + "\n" + + "trigger_id\x18\x01 \x01(\tR\ttriggerId\x12\x19\n" + + "\bevent_id\x18\x02 \x01(\tR\aeventId\x12\x18\n" + + "\apayload\x18\x03 \x01(\fR\apayload\x125\n" + + "\bfirst_at\x18\x04 \x01(\v2\x1a.google.protobuf.TimestampR\afirstAt\x12<\n" + + "\flast_sent_at\x18\x05 \x01(\v2\x1a.google.protobuf.TimestampR\n" + + "lastSentAt\x12\x1a\n" + + "\battempts\x18\x06 \x01(\x05R\battempts\"C\n" + + "\x12InsertEventRequest\x12-\n" + + "\x05event\x18\x01 \x01(\v2\x17.loop.PendingEventProtoR\x05event\"\xab\x01\n" + + "\x15UpdateDeliveryRequest\x12\x1d\n" + + "\n" + + "trigger_id\x18\x01 \x01(\tR\ttriggerId\x12\x19\n" + + "\bevent_id\x18\x02 \x01(\tR\aeventId\x12<\n" + + "\flast_sent_at\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\n" + + "lastSentAt\x12\x1a\n" + + "\battempts\x18\x04 \x01(\x05R\battempts\"E\n" + + "\x12ListEventsResponse\x12/\n" + + "\x06events\x18\x01 \x03(\v2\x17.loop.PendingEventProtoR\x06events\"N\n" + + "\x12DeleteEventRequest\x12\x1d\n" + + "\n" + + "trigger_id\x18\x01 \x01(\tR\ttriggerId\x12\x19\n" + + "\bevent_id\x18\x02 \x01(\tR\aeventId\">\n" + + "\x1dDeleteEventsForTriggerRequest\x12\x1d\n" + + "\n" + + "trigger_id\x18\x01 \x01(\tR\ttriggerId2\xe1\x02\n" + + "\n" + + "EventStore\x12:\n" + + "\x06Insert\x12\x18.loop.InsertEventRequest\x1a\x16.google.protobuf.Empty\x12E\n" + + "\x0eUpdateDelivery\x12\x1b.loop.UpdateDeliveryRequest\x1a\x16.google.protobuf.Empty\x128\n" + + "\x04List\x12\x16.google.protobuf.Empty\x1a\x18.loop.ListEventsResponse\x12?\n" + + "\vDeleteEvent\x12\x18.loop.DeleteEventRequest\x1a\x16.google.protobuf.Empty\x12U\n" + + "\x16DeleteEventsForTrigger\x12#.loop.DeleteEventsForTriggerRequest\x1a\x16.google.protobuf.EmptyBCZAgithub.com/smartcontractkit/chainlink-common/pkg/loop/internal/pbb\x06proto3" + +var ( + file_event_store_proto_rawDescOnce sync.Once + file_event_store_proto_rawDescData []byte +) + +func file_event_store_proto_rawDescGZIP() []byte { + file_event_store_proto_rawDescOnce.Do(func() { + file_event_store_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_event_store_proto_rawDesc), len(file_event_store_proto_rawDesc))) + }) + return file_event_store_proto_rawDescData +} + +var file_event_store_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_event_store_proto_goTypes = []any{ + (*PendingEventProto)(nil), // 0: loop.PendingEventProto + (*InsertEventRequest)(nil), // 1: loop.InsertEventRequest + (*UpdateDeliveryRequest)(nil), // 2: loop.UpdateDeliveryRequest + (*ListEventsResponse)(nil), // 3: loop.ListEventsResponse + (*DeleteEventRequest)(nil), // 4: loop.DeleteEventRequest + (*DeleteEventsForTriggerRequest)(nil), // 5: loop.DeleteEventsForTriggerRequest + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 7: google.protobuf.Empty +} +var file_event_store_proto_depIdxs = []int32{ + 6, // 0: loop.PendingEventProto.first_at:type_name -> google.protobuf.Timestamp + 6, // 1: loop.PendingEventProto.last_sent_at:type_name -> google.protobuf.Timestamp + 0, // 2: loop.InsertEventRequest.event:type_name -> loop.PendingEventProto + 6, // 3: loop.UpdateDeliveryRequest.last_sent_at:type_name -> google.protobuf.Timestamp + 0, // 4: loop.ListEventsResponse.events:type_name -> loop.PendingEventProto + 1, // 5: loop.EventStore.Insert:input_type -> loop.InsertEventRequest + 2, // 6: loop.EventStore.UpdateDelivery:input_type -> loop.UpdateDeliveryRequest + 7, // 7: loop.EventStore.List:input_type -> google.protobuf.Empty + 4, // 8: loop.EventStore.DeleteEvent:input_type -> loop.DeleteEventRequest + 5, // 9: loop.EventStore.DeleteEventsForTrigger:input_type -> loop.DeleteEventsForTriggerRequest + 7, // 10: loop.EventStore.Insert:output_type -> google.protobuf.Empty + 7, // 11: loop.EventStore.UpdateDelivery:output_type -> google.protobuf.Empty + 3, // 12: loop.EventStore.List:output_type -> loop.ListEventsResponse + 7, // 13: loop.EventStore.DeleteEvent:output_type -> google.protobuf.Empty + 7, // 14: loop.EventStore.DeleteEventsForTrigger:output_type -> google.protobuf.Empty + 10, // [10:15] is the sub-list for method output_type + 5, // [5:10] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_event_store_proto_init() } +func file_event_store_proto_init() { + if File_event_store_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_event_store_proto_rawDesc), len(file_event_store_proto_rawDesc)), + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_event_store_proto_goTypes, + DependencyIndexes: file_event_store_proto_depIdxs, + MessageInfos: file_event_store_proto_msgTypes, + }.Build() + File_event_store_proto = out.File + file_event_store_proto_goTypes = nil + file_event_store_proto_depIdxs = nil +} diff --git a/pkg/loop/internal/pb/event_store.proto b/pkg/loop/internal/pb/event_store.proto new file mode 100644 index 0000000000..d87ed13009 --- /dev/null +++ b/pkg/loop/internal/pb/event_store.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb"; + +package loop; + +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +message PendingEventProto { + string trigger_id = 1; + string event_id = 2; + bytes payload = 3; + google.protobuf.Timestamp first_at = 4; + google.protobuf.Timestamp last_sent_at = 5; + int32 attempts = 6; +} + +message InsertEventRequest { + PendingEventProto event = 1; +} + +message UpdateDeliveryRequest { + string trigger_id = 1; + string event_id = 2; + google.protobuf.Timestamp last_sent_at = 3; + int32 attempts = 4; +} + +message ListEventsResponse { + repeated PendingEventProto events = 1; +} + +message DeleteEventRequest { + string trigger_id = 1; + string event_id = 2; +} + +message DeleteEventsForTriggerRequest { + string trigger_id = 1; +} + +service EventStore { + rpc Insert(InsertEventRequest) returns (google.protobuf.Empty); + rpc UpdateDelivery(UpdateDeliveryRequest) returns (google.protobuf.Empty); + rpc List(google.protobuf.Empty) returns (ListEventsResponse); + rpc DeleteEvent(DeleteEventRequest) returns (google.protobuf.Empty); + rpc DeleteEventsForTrigger(DeleteEventsForTriggerRequest) returns (google.protobuf.Empty); +} diff --git a/pkg/loop/internal/pb/event_store_grpc.pb.go b/pkg/loop/internal/pb/event_store_grpc.pb.go new file mode 100644 index 0000000000..6186b61e05 --- /dev/null +++ b/pkg/loop/internal/pb/event_store_grpc.pb.go @@ -0,0 +1,274 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.29.3 +// source: event_store.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + EventStore_Insert_FullMethodName = "/loop.EventStore/Insert" + EventStore_UpdateDelivery_FullMethodName = "/loop.EventStore/UpdateDelivery" + EventStore_List_FullMethodName = "/loop.EventStore/List" + EventStore_DeleteEvent_FullMethodName = "/loop.EventStore/DeleteEvent" + EventStore_DeleteEventsForTrigger_FullMethodName = "/loop.EventStore/DeleteEventsForTrigger" +) + +// EventStoreClient is the client API for EventStore service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type EventStoreClient interface { + Insert(ctx context.Context, in *InsertEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + UpdateDelivery(ctx context.Context, in *UpdateDeliveryRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + List(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListEventsResponse, error) + DeleteEvent(ctx context.Context, in *DeleteEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + DeleteEventsForTrigger(ctx context.Context, in *DeleteEventsForTriggerRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type eventStoreClient struct { + cc grpc.ClientConnInterface +} + +func NewEventStoreClient(cc grpc.ClientConnInterface) EventStoreClient { + return &eventStoreClient{cc} +} + +func (c *eventStoreClient) Insert(ctx context.Context, in *InsertEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, EventStore_Insert_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *eventStoreClient) UpdateDelivery(ctx context.Context, in *UpdateDeliveryRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, EventStore_UpdateDelivery_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *eventStoreClient) List(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListEventsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ListEventsResponse) + err := c.cc.Invoke(ctx, EventStore_List_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *eventStoreClient) DeleteEvent(ctx context.Context, in *DeleteEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, EventStore_DeleteEvent_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *eventStoreClient) DeleteEventsForTrigger(ctx context.Context, in *DeleteEventsForTriggerRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, EventStore_DeleteEventsForTrigger_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// EventStoreServer is the server API for EventStore service. +// All implementations must embed UnimplementedEventStoreServer +// for forward compatibility. +type EventStoreServer interface { + Insert(context.Context, *InsertEventRequest) (*emptypb.Empty, error) + UpdateDelivery(context.Context, *UpdateDeliveryRequest) (*emptypb.Empty, error) + List(context.Context, *emptypb.Empty) (*ListEventsResponse, error) + DeleteEvent(context.Context, *DeleteEventRequest) (*emptypb.Empty, error) + DeleteEventsForTrigger(context.Context, *DeleteEventsForTriggerRequest) (*emptypb.Empty, error) + mustEmbedUnimplementedEventStoreServer() +} + +// UnimplementedEventStoreServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedEventStoreServer struct{} + +func (UnimplementedEventStoreServer) Insert(context.Context, *InsertEventRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Insert not implemented") +} +func (UnimplementedEventStoreServer) UpdateDelivery(context.Context, *UpdateDeliveryRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateDelivery not implemented") +} +func (UnimplementedEventStoreServer) List(context.Context, *emptypb.Empty) (*ListEventsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method List not implemented") +} +func (UnimplementedEventStoreServer) DeleteEvent(context.Context, *DeleteEventRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method DeleteEvent not implemented") +} +func (UnimplementedEventStoreServer) DeleteEventsForTrigger(context.Context, *DeleteEventsForTriggerRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method DeleteEventsForTrigger not implemented") +} +func (UnimplementedEventStoreServer) mustEmbedUnimplementedEventStoreServer() {} +func (UnimplementedEventStoreServer) testEmbeddedByValue() {} + +// UnsafeEventStoreServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to EventStoreServer will +// result in compilation errors. +type UnsafeEventStoreServer interface { + mustEmbedUnimplementedEventStoreServer() +} + +func RegisterEventStoreServer(s grpc.ServiceRegistrar, srv EventStoreServer) { + // If the following call pancis, it indicates UnimplementedEventStoreServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&EventStore_ServiceDesc, srv) +} + +func _EventStore_Insert_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(InsertEventRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventStoreServer).Insert(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EventStore_Insert_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventStoreServer).Insert(ctx, req.(*InsertEventRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _EventStore_UpdateDelivery_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateDeliveryRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventStoreServer).UpdateDelivery(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EventStore_UpdateDelivery_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventStoreServer).UpdateDelivery(ctx, req.(*UpdateDeliveryRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _EventStore_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventStoreServer).List(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EventStore_List_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventStoreServer).List(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _EventStore_DeleteEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteEventRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventStoreServer).DeleteEvent(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EventStore_DeleteEvent_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventStoreServer).DeleteEvent(ctx, req.(*DeleteEventRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _EventStore_DeleteEventsForTrigger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteEventsForTriggerRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventStoreServer).DeleteEventsForTrigger(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EventStore_DeleteEventsForTrigger_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventStoreServer).DeleteEventsForTrigger(ctx, req.(*DeleteEventsForTriggerRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// EventStore_ServiceDesc is the grpc.ServiceDesc for EventStore service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var EventStore_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "loop.EventStore", + HandlerType: (*EventStoreServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Insert", + Handler: _EventStore_Insert_Handler, + }, + { + MethodName: "UpdateDelivery", + Handler: _EventStore_UpdateDelivery_Handler, + }, + { + MethodName: "List", + Handler: _EventStore_List_Handler, + }, + { + MethodName: "DeleteEvent", + Handler: _EventStore_DeleteEvent_Handler, + }, + { + MethodName: "DeleteEventsForTrigger", + Handler: _EventStore_DeleteEventsForTrigger_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "event_store.proto", +} diff --git a/pkg/loop/internal/pb/generate.go b/pkg/loop/internal/pb/generate.go index 4dfeb45817..32c2726f2e 100644 --- a/pkg/loop/internal/pb/generate.go +++ b/pkg/loop/internal/pb/generate.go @@ -11,4 +11,5 @@ //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative keyvalue_store.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative validate_config.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative orgresolver.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative event_store.proto package pb