From fe67a037a9649f1d69bae06def3a45b838fe4710 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Mon, 12 May 2025 14:06:03 +0500 Subject: [PATCH] implement supernode streaming for cascade action registration --- gen/supernode/action/cascade/service.pb.go | 178 +++++++++--- .../action/cascade/service_grpc.pb.go | 13 +- gen/supernode/action/service.pb.go | 177 ------------ gen/supernode/action/service_grpc.pb.go | 111 -------- pkg/logtrace/fields.go | 29 +- proto/supernode/action/cascade/service.proto | 36 ++- proto/supernode/action/service.proto | 18 -- sdk/adapters/supernodeservice/adapter.go | 81 +++++- sdk/adapters/supernodeservice/types.go | 17 +- sdk/adapters/supernodeservice/types_test.go | 37 +++ sdk/event/types.go | 33 ++- sdk/task/cascade.go | 5 +- supernode/cmd/keys_test.go | 24 ++ .../server/cascade/cascade_action_server.go | 45 +-- supernode/node/supernode/client/client.go | 50 ---- supernode/node/supernode/client/connection.go | 21 -- supernode/node/supernode/client/session.go | 13 - .../node/supernode/node_client_interface.go | 32 --- supernode/services/cascade/events.go | 18 ++ supernode/services/cascade/register.go | 75 +++-- supernode/services/cascade/task.go | 11 +- supernode/services/common/network_handler.go | 256 ------------------ supernode/services/common/node_peer.go | 82 ------ 23 files changed, 471 insertions(+), 891 deletions(-) delete mode 100644 gen/supernode/action/service.pb.go delete mode 100644 gen/supernode/action/service_grpc.pb.go delete mode 100644 proto/supernode/action/service.proto create mode 100644 sdk/adapters/supernodeservice/types_test.go create mode 100644 supernode/cmd/keys_test.go delete mode 100644 supernode/node/supernode/client/client.go delete mode 100644 supernode/node/supernode/client/connection.go delete mode 100644 supernode/node/supernode/client/session.go delete mode 100644 supernode/node/supernode/node_client_interface.go create mode 100644 supernode/services/cascade/events.go delete mode 100644 supernode/services/common/network_handler.go delete mode 100644 supernode/services/common/node_peer.go diff --git a/gen/supernode/action/cascade/service.pb.go b/gen/supernode/action/cascade/service.pb.go index a0c4a1d3..c0a20173 100644 --- a/gen/supernode/action/cascade/service.pb.go +++ b/gen/supernode/action/cascade/service.pb.go @@ -20,6 +20,82 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type SupernodeEventType int32 + +const ( + SupernodeEventType_UNKNOWN SupernodeEventType = 0 + SupernodeEventType_ACTION_RETRIEVED SupernodeEventType = 1 + SupernodeEventType_ACTION_FEE_VERIFIED SupernodeEventType = 2 + SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED SupernodeEventType = 3 + SupernodeEventType_METADATA_DECODED SupernodeEventType = 4 + SupernodeEventType_DATA_HASH_VERIFIED SupernodeEventType = 5 + SupernodeEventType_INPUT_ENCODED SupernodeEventType = 6 + SupernodeEventType_SIGNATURE_VERIFIED SupernodeEventType = 7 + SupernodeEventType_RQID_GENERATED SupernodeEventType = 8 + SupernodeEventType_RQID_VERIFIED SupernodeEventType = 9 + SupernodeEventType_ARTEFACTS_STORED SupernodeEventType = 10 + SupernodeEventType_ACTION_FINALIZED SupernodeEventType = 11 +) + +// Enum value maps for SupernodeEventType. +var ( + SupernodeEventType_name = map[int32]string{ + 0: "UNKNOWN", + 1: "ACTION_RETRIEVED", + 2: "ACTION_FEE_VERIFIED", + 3: "TOP_SUPERNODE_CHECK_PASSED", + 4: "METADATA_DECODED", + 5: "DATA_HASH_VERIFIED", + 6: "INPUT_ENCODED", + 7: "SIGNATURE_VERIFIED", + 8: "RQID_GENERATED", + 9: "RQID_VERIFIED", + 10: "ARTEFACTS_STORED", + 11: "ACTION_FINALIZED", + } + SupernodeEventType_value = map[string]int32{ + "UNKNOWN": 0, + "ACTION_RETRIEVED": 1, + "ACTION_FEE_VERIFIED": 2, + "TOP_SUPERNODE_CHECK_PASSED": 3, + "METADATA_DECODED": 4, + "DATA_HASH_VERIFIED": 5, + "INPUT_ENCODED": 6, + "SIGNATURE_VERIFIED": 7, + "RQID_GENERATED": 8, + "RQID_VERIFIED": 9, + "ARTEFACTS_STORED": 10, + "ACTION_FINALIZED": 11, + } +) + +func (x SupernodeEventType) Enum() *SupernodeEventType { + p := new(SupernodeEventType) + *p = x + return p +} + +func (x SupernodeEventType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (SupernodeEventType) Descriptor() protoreflect.EnumDescriptor { + return file_supernode_action_cascade_service_proto_enumTypes[0].Descriptor() +} + +func (SupernodeEventType) Type() protoreflect.EnumType { + return &file_supernode_action_cascade_service_proto_enumTypes[0] +} + +func (x SupernodeEventType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use SupernodeEventType.Descriptor instead. +func (SupernodeEventType) EnumDescriptor() ([]byte, []int) { + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{0} +} + type RegisterRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -202,8 +278,9 @@ type RegisterResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` - Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + EventType SupernodeEventType `protobuf:"varint,1,opt,name=event_type,json=eventType,proto3,enum=cascade.SupernodeEventType" json:"event_type,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + TxHash string `protobuf:"bytes,3,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` } func (x *RegisterResponse) Reset() { @@ -236,11 +313,11 @@ func (*RegisterResponse) Descriptor() ([]byte, []int) { return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{3} } -func (x *RegisterResponse) GetSuccess() bool { +func (x *RegisterResponse) GetEventType() SupernodeEventType { if x != nil { - return x.Success + return x.EventType } - return false + return SupernodeEventType_UNKNOWN } func (x *RegisterResponse) GetMessage() string { @@ -250,6 +327,13 @@ func (x *RegisterResponse) GetMessage() string { return "" } +func (x *RegisterResponse) GetTxHash() string { + if x != nil { + return x.TxHash + } + return "" +} + var File_supernode_action_cascade_service_proto protoreflect.FileDescriptor var file_supernode_action_cascade_service_proto_rawDesc = []byte{ @@ -270,21 +354,43 @@ var file_supernode_action_cascade_service_proto_rawDesc = []byte{ 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x46, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, - 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, - 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x53, 0x0a, 0x0e, - 0x43, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x41, - 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, 0x63, 0x61, 0x73, - 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, - 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, - 0x01, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x4c, 0x75, 0x6d, 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x73, - 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x73, 0x75, 0x70, - 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x63, 0x61, - 0x73, 0x63, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x81, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x0a, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, + 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x53, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, + 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, + 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x2a, 0x9c, 0x02, 0x0a, 0x12, 0x53, 0x75, 0x70, + 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, + 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x54, 0x52, 0x49, 0x45, 0x56, 0x45, 0x44, + 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x45, 0x45, + 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x54, + 0x4f, 0x50, 0x5f, 0x53, 0x55, 0x50, 0x45, 0x52, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x43, 0x48, 0x45, + 0x43, 0x4b, 0x5f, 0x50, 0x41, 0x53, 0x53, 0x45, 0x44, 0x10, 0x03, 0x12, 0x14, 0x0a, 0x10, 0x4d, + 0x45, 0x54, 0x41, 0x44, 0x41, 0x54, 0x41, 0x5f, 0x44, 0x45, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, + 0x04, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x41, 0x54, 0x41, 0x5f, 0x48, 0x41, 0x53, 0x48, 0x5f, 0x56, + 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x05, 0x12, 0x11, 0x0a, 0x0d, 0x49, 0x4e, 0x50, + 0x55, 0x54, 0x5f, 0x45, 0x4e, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, 0x06, 0x12, 0x16, 0x0a, 0x12, + 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, + 0x45, 0x44, 0x10, 0x07, 0x12, 0x12, 0x0a, 0x0e, 0x52, 0x51, 0x49, 0x44, 0x5f, 0x47, 0x45, 0x4e, + 0x45, 0x52, 0x41, 0x54, 0x45, 0x44, 0x10, 0x08, 0x12, 0x11, 0x0a, 0x0d, 0x52, 0x51, 0x49, 0x44, + 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x09, 0x12, 0x14, 0x0a, 0x10, 0x41, + 0x52, 0x54, 0x45, 0x46, 0x41, 0x43, 0x54, 0x53, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x45, 0x44, 0x10, + 0x0a, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x49, 0x4e, 0x41, + 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x0b, 0x32, 0x55, 0x0a, 0x0e, 0x43, 0x61, 0x73, 0x63, 0x61, + 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x43, 0x0a, 0x08, 0x52, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, + 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x42, + 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x75, 0x6d, + 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x73, 0x75, 0x70, 0x65, + 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, + 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x63, 0x61, 0x73, 0x63, 0x61, + 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -299,23 +405,26 @@ func file_supernode_action_cascade_service_proto_rawDescGZIP() []byte { return file_supernode_action_cascade_service_proto_rawDescData } +var file_supernode_action_cascade_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_supernode_action_cascade_service_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_supernode_action_cascade_service_proto_goTypes = []any{ - (*RegisterRequest)(nil), // 0: cascade.RegisterRequest - (*DataChunk)(nil), // 1: cascade.DataChunk - (*Metadata)(nil), // 2: cascade.Metadata - (*RegisterResponse)(nil), // 3: cascade.RegisterResponse + (SupernodeEventType)(0), // 0: cascade.SupernodeEventType + (*RegisterRequest)(nil), // 1: cascade.RegisterRequest + (*DataChunk)(nil), // 2: cascade.DataChunk + (*Metadata)(nil), // 3: cascade.Metadata + (*RegisterResponse)(nil), // 4: cascade.RegisterResponse } var file_supernode_action_cascade_service_proto_depIdxs = []int32{ - 1, // 0: cascade.RegisterRequest.chunk:type_name -> cascade.DataChunk - 2, // 1: cascade.RegisterRequest.metadata:type_name -> cascade.Metadata - 0, // 2: cascade.CascadeService.Register:input_type -> cascade.RegisterRequest - 3, // 3: cascade.CascadeService.Register:output_type -> cascade.RegisterResponse - 3, // [3:4] is the sub-list for method output_type - 2, // [2:3] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 2, // 0: cascade.RegisterRequest.chunk:type_name -> cascade.DataChunk + 3, // 1: cascade.RegisterRequest.metadata:type_name -> cascade.Metadata + 0, // 2: cascade.RegisterResponse.event_type:type_name -> cascade.SupernodeEventType + 1, // 3: cascade.CascadeService.Register:input_type -> cascade.RegisterRequest + 4, // 4: cascade.CascadeService.Register:output_type -> cascade.RegisterResponse + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_supernode_action_cascade_service_proto_init() } @@ -332,13 +441,14 @@ func file_supernode_action_cascade_service_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_supernode_action_cascade_service_proto_rawDesc, - NumEnums: 0, + NumEnums: 1, NumMessages: 4, NumExtensions: 0, NumServices: 1, }, GoTypes: file_supernode_action_cascade_service_proto_goTypes, DependencyIndexes: file_supernode_action_cascade_service_proto_depIdxs, + EnumInfos: file_supernode_action_cascade_service_proto_enumTypes, MessageInfos: file_supernode_action_cascade_service_proto_msgTypes, }.Build() File_supernode_action_cascade_service_proto = out.File diff --git a/gen/supernode/action/cascade/service_grpc.pb.go b/gen/supernode/action/cascade/service_grpc.pb.go index cd23a36e..2dc82af2 100644 --- a/gen/supernode/action/cascade/service_grpc.pb.go +++ b/gen/supernode/action/cascade/service_grpc.pb.go @@ -26,7 +26,7 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type CascadeServiceClient interface { - Register(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[RegisterRequest, RegisterResponse], error) + Register(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[RegisterRequest, RegisterResponse], error) } type cascadeServiceClient struct { @@ -37,7 +37,7 @@ func NewCascadeServiceClient(cc grpc.ClientConnInterface) CascadeServiceClient { return &cascadeServiceClient{cc} } -func (c *cascadeServiceClient) Register(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[RegisterRequest, RegisterResponse], error) { +func (c *cascadeServiceClient) Register(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[RegisterRequest, RegisterResponse], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &CascadeService_ServiceDesc.Streams[0], CascadeService_Register_FullMethodName, cOpts...) if err != nil { @@ -48,13 +48,13 @@ func (c *cascadeServiceClient) Register(ctx context.Context, opts ...grpc.CallOp } // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type CascadeService_RegisterClient = grpc.ClientStreamingClient[RegisterRequest, RegisterResponse] +type CascadeService_RegisterClient = grpc.BidiStreamingClient[RegisterRequest, RegisterResponse] // CascadeServiceServer is the server API for CascadeService service. // All implementations must embed UnimplementedCascadeServiceServer // for forward compatibility. type CascadeServiceServer interface { - Register(grpc.ClientStreamingServer[RegisterRequest, RegisterResponse]) error + Register(grpc.BidiStreamingServer[RegisterRequest, RegisterResponse]) error mustEmbedUnimplementedCascadeServiceServer() } @@ -65,7 +65,7 @@ type CascadeServiceServer interface { // pointer dereference when methods are called. type UnimplementedCascadeServiceServer struct{} -func (UnimplementedCascadeServiceServer) Register(grpc.ClientStreamingServer[RegisterRequest, RegisterResponse]) error { +func (UnimplementedCascadeServiceServer) Register(grpc.BidiStreamingServer[RegisterRequest, RegisterResponse]) error { return status.Errorf(codes.Unimplemented, "method Register not implemented") } func (UnimplementedCascadeServiceServer) mustEmbedUnimplementedCascadeServiceServer() {} @@ -94,7 +94,7 @@ func _CascadeService_Register_Handler(srv interface{}, stream grpc.ServerStream) } // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type CascadeService_RegisterServer = grpc.ClientStreamingServer[RegisterRequest, RegisterResponse] +type CascadeService_RegisterServer = grpc.BidiStreamingServer[RegisterRequest, RegisterResponse] // CascadeService_ServiceDesc is the grpc.ServiceDesc for CascadeService service. // It's only intended for direct use with grpc.RegisterService, @@ -107,6 +107,7 @@ var CascadeService_ServiceDesc = grpc.ServiceDesc{ { StreamName: "Register", Handler: _CascadeService_Register_Handler, + ServerStreams: true, ClientStreams: true, }, }, diff --git a/gen/supernode/action/service.pb.go b/gen/supernode/action/service.pb.go deleted file mode 100644 index 03021e39..00000000 --- a/gen/supernode/action/service.pb.go +++ /dev/null @@ -1,177 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.35.1 -// protoc v3.12.4 -// source: service.proto - -package action - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// Request and response messages -type GetHealthCheckRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *GetHealthCheckRequest) Reset() { - *x = GetHealthCheckRequest{} - mi := &file_service_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *GetHealthCheckRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetHealthCheckRequest) ProtoMessage() {} - -func (x *GetHealthCheckRequest) ProtoReflect() protoreflect.Message { - mi := &file_service_proto_msgTypes[0] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetHealthCheckRequest.ProtoReflect.Descriptor instead. -func (*GetHealthCheckRequest) Descriptor() ([]byte, []int) { - return file_service_proto_rawDescGZIP(), []int{0} -} - -type GetHealthCheckResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Status string `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` -} - -func (x *GetHealthCheckResponse) Reset() { - *x = GetHealthCheckResponse{} - mi := &file_service_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *GetHealthCheckResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetHealthCheckResponse) ProtoMessage() {} - -func (x *GetHealthCheckResponse) ProtoReflect() protoreflect.Message { - mi := &file_service_proto_msgTypes[1] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetHealthCheckResponse.ProtoReflect.Descriptor instead. -func (*GetHealthCheckResponse) Descriptor() ([]byte, []int) { - return file_service_proto_rawDescGZIP(), []int{1} -} - -func (x *GetHealthCheckResponse) GetStatus() string { - if x != nil { - return x.Status - } - return "" -} - -var File_service_proto protoreflect.FileDescriptor - -var file_service_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x17, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x48, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x22, 0x30, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, - 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x32, 0x5d, 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x12, 0x4c, 0x0a, 0x0b, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x6c, - 0x74, 0x68, 0x12, 0x1d, 0x2e, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x48, - 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1e, 0x2e, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x48, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x4c, 0x75, 0x6d, 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x73, - 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x73, 0x75, 0x70, - 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_service_proto_rawDescOnce sync.Once - file_service_proto_rawDescData = file_service_proto_rawDesc -) - -func file_service_proto_rawDescGZIP() []byte { - file_service_proto_rawDescOnce.Do(func() { - file_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_service_proto_rawDescData) - }) - return file_service_proto_rawDescData -} - -var file_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_service_proto_goTypes = []any{ - (*GetHealthCheckRequest)(nil), // 0: action.GetHealthCheckRequest - (*GetHealthCheckResponse)(nil), // 1: action.GetHealthCheckResponse -} -var file_service_proto_depIdxs = []int32{ - 0, // 0: action.ActionService.CheckHealth:input_type -> action.GetHealthCheckRequest - 1, // 1: action.ActionService.CheckHealth:output_type -> action.GetHealthCheckResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_service_proto_init() } -func file_service_proto_init() { - if File_service_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_service_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_service_proto_goTypes, - DependencyIndexes: file_service_proto_depIdxs, - MessageInfos: file_service_proto_msgTypes, - }.Build() - File_service_proto = out.File - file_service_proto_rawDesc = nil - file_service_proto_goTypes = nil - file_service_proto_depIdxs = nil -} diff --git a/gen/supernode/action/service_grpc.pb.go b/gen/supernode/action/service_grpc.pb.go deleted file mode 100644 index aec9dc9b..00000000 --- a/gen/supernode/action/service_grpc.pb.go +++ /dev/null @@ -1,111 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v3.12.4 -// source: service.proto - -package action - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -const ( - ActionService_CheckHealth_FullMethodName = "/action.ActionService/CheckHealth" -) - -// ActionServiceClient is the client API for ActionService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type ActionServiceClient interface { - // Get Health Check Status - CheckHealth(ctx context.Context, in *GetHealthCheckRequest, opts ...grpc.CallOption) (*GetHealthCheckResponse, error) -} - -type actionServiceClient struct { - cc grpc.ClientConnInterface -} - -func NewActionServiceClient(cc grpc.ClientConnInterface) ActionServiceClient { - return &actionServiceClient{cc} -} - -func (c *actionServiceClient) CheckHealth(ctx context.Context, in *GetHealthCheckRequest, opts ...grpc.CallOption) (*GetHealthCheckResponse, error) { - out := new(GetHealthCheckResponse) - err := c.cc.Invoke(ctx, ActionService_CheckHealth_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// ActionServiceServer is the server API for ActionService service. -// All implementations must embed UnimplementedActionServiceServer -// for forward compatibility -type ActionServiceServer interface { - // Get Health Check Status - CheckHealth(context.Context, *GetHealthCheckRequest) (*GetHealthCheckResponse, error) - mustEmbedUnimplementedActionServiceServer() -} - -// UnimplementedActionServiceServer must be embedded to have forward compatible implementations. -type UnimplementedActionServiceServer struct { -} - -func (UnimplementedActionServiceServer) CheckHealth(context.Context, *GetHealthCheckRequest) (*GetHealthCheckResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method CheckHealth not implemented") -} -func (UnimplementedActionServiceServer) mustEmbedUnimplementedActionServiceServer() {} - -// UnsafeActionServiceServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to ActionServiceServer will -// result in compilation errors. -type UnsafeActionServiceServer interface { - mustEmbedUnimplementedActionServiceServer() -} - -func RegisterActionServiceServer(s grpc.ServiceRegistrar, srv ActionServiceServer) { - s.RegisterService(&ActionService_ServiceDesc, srv) -} - -func _ActionService_CheckHealth_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(GetHealthCheckRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ActionServiceServer).CheckHealth(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: ActionService_CheckHealth_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ActionServiceServer).CheckHealth(ctx, req.(*GetHealthCheckRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// ActionService_ServiceDesc is the grpc.ServiceDesc for ActionService service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var ActionService_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "action.ActionService", - HandlerType: (*ActionServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "CheckHealth", - Handler: _ActionService_CheckHealth_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "service.proto", -} diff --git a/pkg/logtrace/fields.go b/pkg/logtrace/fields.go index 9b28ac85..c3f0e9fe 100644 --- a/pkg/logtrace/fields.go +++ b/pkg/logtrace/fields.go @@ -4,21 +4,16 @@ package logtrace type Fields map[string]interface{} const ( - FieldCorrelationID = "correlation_id" - FieldMethod = "method" - FieldModule = "module" - FieldError = "error" - FieldStatus = "status" - FieldBlockHeight = "block_height" - FieldLimit = "limit" - FieldSupernodeState = "supernode_state" - FieldRequest = "request" - FieldSupernodeAccountAddress = "supernode_account_address" - FieldIsPrimary = "is_primary" - FieldStackTrace = "stack_trace" - - ValueLumeraSDK = "lumera-sdk" - ValueActionSDK = "action-sdk" - ValueTransaction = "cosmos-tx" - ValueBaseTendermint = "tendermint" + FieldCorrelationID = "correlation_id" + FieldMethod = "method" + FieldModule = "module" + FieldError = "error" + FieldStatus = "status" + FieldBlockHeight = "block_height" + FieldCreator = "creator" + FieldPrice = "price" + FieldSupernodeState = "supernode_state" + FieldRequest = "request" + FieldStackTrace = "stack_trace" + FieldTxHash = "tx_hash" ) diff --git a/proto/supernode/action/cascade/service.proto b/proto/supernode/action/cascade/service.proto index 3987231a..c52f8b49 100644 --- a/proto/supernode/action/cascade/service.proto +++ b/proto/supernode/action/cascade/service.proto @@ -3,26 +3,42 @@ package cascade; option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"; service CascadeService { - rpc Register (stream RegisterRequest) returns (RegisterResponse); + rpc Register (stream RegisterRequest) returns (stream RegisterResponse); } message RegisterRequest { - oneof request_type { - DataChunk chunk = 1; - Metadata metadata = 2; - } + oneof request_type { + DataChunk chunk = 1; + Metadata metadata = 2; + } } message DataChunk { - bytes data = 1; + bytes data = 1; } message Metadata { - string task_id = 1; - string action_id = 2; + string task_id = 1; + string action_id = 2; } message RegisterResponse { - bool success = 1; - string message = 2; + SupernodeEventType event_type = 1; + string message = 2; + string tx_hash = 3; +} + +enum SupernodeEventType { + UNKNOWN = 0; + ACTION_RETRIEVED = 1; + ACTION_FEE_VERIFIED = 2; + TOP_SUPERNODE_CHECK_PASSED = 3; + METADATA_DECODED = 4; + DATA_HASH_VERIFIED = 5; + INPUT_ENCODED = 6; + SIGNATURE_VERIFIED = 7; + RQID_GENERATED = 8; + RQID_VERIFIED = 9; + ARTEFACTS_STORED = 10; + ACTION_FINALIZED = 11; } \ No newline at end of file diff --git a/proto/supernode/action/service.proto b/proto/supernode/action/service.proto deleted file mode 100644 index 923a09d2..00000000 --- a/proto/supernode/action/service.proto +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; - -package action; - -option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action"; - -// Service definition -service ActionService { -// Get Health Check Status -rpc CheckHealth(GetHealthCheckRequest) returns (GetHealthCheckResponse); -} - -// Request and response messages -message GetHealthCheckRequest {} - -message GetHealthCheckResponse { -string status = 1; -} \ No newline at end of file diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index 24871fa9..3588bb63 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -3,10 +3,13 @@ package supernodeservice import ( "context" "fmt" + "io" + "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" + "github.com/LumeraProtocol/supernode/pkg/net" + "github.com/LumeraProtocol/supernode/sdk/event" "github.com/LumeraProtocol/supernode/sdk/log" - "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" "google.golang.org/grpc" ) @@ -30,6 +33,8 @@ func NewCascadeAdapter(ctx context.Context, client cascade.CascadeServiceClient, func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *CascadeSupernodeRegisterRequest, opts ...grpc.CallOption) (*CascadeSupernodeRegisterResponse, error) { // Create the client stream + ctx = net.AddCorrelationID(ctx) + stream, err := a.client.Register(ctx, opts...) if err != nil { a.logger.Error(ctx, "Failed to create register stream", @@ -95,18 +100,78 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca a.logger.Debug(ctx, "Sent metadata", "TaskId", in.TaskId, "ActionID", in.ActionID) - resp, err := stream.CloseAndRecv() - if err != nil { + if err := stream.CloseSend(); err != nil { a.logger.Error(ctx, "Failed to close stream and receive response", "TaskId", in.TaskId, "ActionID", in.ActionID, "error", err) return nil, fmt.Errorf("failed to receive response: %w", err) } - response := &CascadeSupernodeRegisterResponse{ - Success: resp.Success, - Message: resp.Message, + // Handle streaming responses from supernode + var finalResp *cascade.RegisterResponse + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("failed to receive server response: %w", err) + } + + // Log the streamed progress update + a.logger.Info(ctx, "Supernode progress update received", + "event_type", resp.EventType, + "message", resp.Message, + "tx_hash", resp.TxHash, + "task_id", in.TaskId, + "action_id", in.ActionID, + ) + + if in.EventLogger != nil { + in.EventLogger(ctx, toSdkEvent(resp.EventType), resp.Message, nil) + } + + // Optionally capture the final response + if resp.TxHash != "" { + finalResp = resp + } + } + + if finalResp == nil { + return nil, fmt.Errorf("no final response with tx_hash received") } - a.logger.Info(ctx, "Successfully registered supernode data", "TaskId", in.TaskId, "ActionID", in.ActionID, "dataSize", totalBytes, "success", resp.Success, "message", resp.Message) + return &CascadeSupernodeRegisterResponse{ + Success: true, + Message: finalResp.Message, + TxHash: finalResp.TxHash, + }, nil +} - return response, nil +// toSdkEvent converts a supernode-side enum value into an internal SDK EventType. +func toSdkEvent(e cascade.SupernodeEventType) event.EventType { + switch e { + case cascade.SupernodeEventType_ACTION_RETRIEVED: + return event.TaskProgressActionRetrievedBySupernode + case cascade.SupernodeEventType_ACTION_FEE_VERIFIED: + return event.TaskProgressActionFeeValidated + case cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED: + return event.TaskProgressTopSupernodeCheckValidated + case cascade.SupernodeEventType_METADATA_DECODED: + return event.TaskProgressCascadeMetadataDecoded + case cascade.SupernodeEventType_DATA_HASH_VERIFIED: + return event.TaskProgressDataHashVerified + case cascade.SupernodeEventType_INPUT_ENCODED: + return event.TaskProgressInputDataEncoded + case cascade.SupernodeEventType_SIGNATURE_VERIFIED: + return event.TaskProgressSignatureVerified + case cascade.SupernodeEventType_RQID_GENERATED: + return event.TaskProgressRQIDFilesGenerated + case cascade.SupernodeEventType_RQID_VERIFIED: + return event.TaskProgressRQIDsVerified + case cascade.SupernodeEventType_ARTEFACTS_STORED: + return event.TaskProgressArtefactsStored + case cascade.SupernodeEventType_ACTION_FINALIZED: + return event.TaskProgressActionFinalized + default: + return event.EventType("task.progress.unknown") + } } diff --git a/sdk/adapters/supernodeservice/types.go b/sdk/adapters/supernodeservice/types.go index d64b914b..6a11f74e 100644 --- a/sdk/adapters/supernodeservice/types.go +++ b/sdk/adapters/supernodeservice/types.go @@ -4,17 +4,28 @@ import ( "context" "google.golang.org/grpc" + + "github.com/LumeraProtocol/supernode/sdk/event" +) + +type LoggerFunc func( + ctx context.Context, + eventType event.EventType, + message string, + data map[string]interface{}, ) type CascadeSupernodeRegisterRequest struct { - Data []byte - ActionID string - TaskId string + Data []byte + ActionID string + TaskId string + EventLogger LoggerFunc } type CascadeSupernodeRegisterResponse struct { Success bool Message string + TxHash string } //go:generate mockery --name=CascadeServiceClient --output=testutil/mocks --outpkg=mocks --filename=cascade_service_mock.go diff --git a/sdk/adapters/supernodeservice/types_test.go b/sdk/adapters/supernodeservice/types_test.go new file mode 100644 index 00000000..ea18d042 --- /dev/null +++ b/sdk/adapters/supernodeservice/types_test.go @@ -0,0 +1,37 @@ +package supernodeservice + +import ( + "testing" + + "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" + "github.com/LumeraProtocol/supernode/sdk/event" + "github.com/stretchr/testify/require" +) + +func TestTranslateSupernodeEvent(t *testing.T) { + tests := []struct { + name string + input cascade.SupernodeEventType + expected event.EventType + }{ + {"ACTION_RETRIEVED", cascade.SupernodeEventType_ACTION_RETRIEVED, event.TaskProgressActionRetrievedBySupernode}, + {"ACTION_FEE_VERIFIED", cascade.SupernodeEventType_ACTION_FEE_VERIFIED, event.TaskProgressActionFeeValidated}, + {"TOP_SUPERNODE_CHECK_PASSED", cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED, event.TaskProgressTopSupernodeCheckValidated}, + {"METADATA_DECODED", cascade.SupernodeEventType_METADATA_DECODED, event.TaskProgressCascadeMetadataDecoded}, + {"DATA_HASH_VERIFIED", cascade.SupernodeEventType_DATA_HASH_VERIFIED, event.TaskProgressDataHashVerified}, + {"INPUT_ENCODED", cascade.SupernodeEventType_INPUT_ENCODED, event.TaskProgressInputDataEncoded}, + {"SIGNATURE_VERIFIED", cascade.SupernodeEventType_SIGNATURE_VERIFIED, event.TaskProgressSignatureVerified}, + {"RQID_GENERATED", cascade.SupernodeEventType_RQID_GENERATED, event.TaskProgressRQIDFilesGenerated}, + {"RQID_VERIFIED", cascade.SupernodeEventType_RQID_VERIFIED, event.TaskProgressRQIDsVerified}, + {"ARTEFACTS_STORED", cascade.SupernodeEventType_ARTEFACTS_STORED, event.TaskProgressArtefactsStored}, + {"ACTION_FINALIZED", cascade.SupernodeEventType_ACTION_FINALIZED, event.TaskProgressActionFinalized}, + {"UNKNOWN_TYPE", cascade.SupernodeEventType(999), event.EventType("task.progress.unknown")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := toSdkEvent(tt.input) + require.Equal(t, tt.expected, actual) + }) + } +} diff --git a/sdk/event/types.go b/sdk/event/types.go index 7f6df100..c0002d20 100644 --- a/sdk/event/types.go +++ b/sdk/event/types.go @@ -14,17 +14,28 @@ type EventType string // These events are used to track the progress of tasks // and to notify subscribers about important changes in the system. const ( - TaskStarted EventType = "task.started" - TaskProgressActionVerified EventType = "task.progress.action_verified" - TaskProgressActionVerificationFailed EventType = "task.progress.action_verification_failed" - TaskProgressSupernodesFound EventType = "task.progress.supernode_found" - TaskProgressSupernodesUnavailable EventType = "task.progress.supernodes_unavailable" - TaskProgressRegistrationInProgress EventType = "task.progress.registration_in_progress" - TaskProgressRegistrationFailure EventType = "task.progress.registration_failure" - TaskProgressRegistrationSuccessful EventType = "task.progress.registration_successful" - TaskCompleted EventType = "task.completed" - TxhasReceived EventType = "txhash.received" - TaskFailed EventType = "task.failed" + TaskStarted EventType = "task.started" + TaskProgressActionVerified EventType = "task.progress.action_verified" + TaskProgressActionVerificationFailed EventType = "task.progress.action_verification_failed" + TaskProgressSupernodesFound EventType = "task.progress.supernode_found" + TaskProgressSupernodesUnavailable EventType = "task.progress.supernodes_unavailable" + TaskProgressActionRetrievedBySupernode EventType = "task.progress.action_retrieved_by_supernode" + TaskProgressActionFeeValidated EventType = "task.progress.action_fee_validated" + TaskProgressTopSupernodeCheckValidated EventType = "task.progress.top_sn_check_validated" + TaskProgressArtefactsStored EventType = "task.progress.artefacts_stored" + TaskProgressCascadeMetadataDecoded EventType = "task.progress.cascade_metadata_decoded" + TaskProgressDataHashVerified EventType = "task.progress.data_hash_verified" + TaskProgressInputDataEncoded EventType = "task.progress.input_data_encoded" + TaskProgressSignatureVerified EventType = "task.progress.signature_verified" + TaskProgressRQIDFilesGenerated EventType = "task.progress.rq_id_files_generated" + TaskProgressRQIDsVerified EventType = "task.progress.rq_ids_verified" + TaskProgressActionFinalized EventType = "task.progress.action_finalized" + TaskProgressRegistrationInProgress EventType = "task.progress.registration_in_progress" + TaskProgressRegistrationFailure EventType = "task.progress.registration_failure" + TaskProgressRegistrationSuccessful EventType = "task.progress.registration_successful" + TaskCompleted EventType = "task.completed" + TxhasReceived EventType = "txhash.received" + TaskFailed EventType = "task.failed" ) // Task progress steps in order diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index 935c4788..b058cc77 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -183,6 +183,9 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum uploadCtx, cancel := context.WithTimeout(ctx, registrationTimeout) defer cancel() + req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data map[string]interface{}) { + t.logEvent(ctx, evt, msg, data) + } resp, err := client.RegisterCascade(uploadCtx, req) if err != nil { return fmt.Errorf("upload to %s: %w", sn.CosmosAddress, err) @@ -191,7 +194,7 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum return fmt.Errorf("upload rejected by %s: %s", sn.CosmosAddress, resp.Message) } - txhash := CleanTxHash(resp.Message) + txhash := CleanTxHash(resp.TxHash) t.logEvent(ctx, event.TxhasReceived, "txhash received", map[string]interface{}{ "txhash": txhash, "supernode": sn.CosmosAddress, diff --git a/supernode/cmd/keys_test.go b/supernode/cmd/keys_test.go new file mode 100644 index 00000000..670c4b93 --- /dev/null +++ b/supernode/cmd/keys_test.go @@ -0,0 +1,24 @@ +package cmd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKeysCmdMetadata(t *testing.T) { + assert.Equal(t, "keys", keysCmd.Use) + assert.Contains(t, keysCmd.Short, "Manage keys") + assert.Contains(t, keysCmd.Long, "Manage keys for the Supernode") +} + +func TestKeysCmdRegistered(t *testing.T) { + found := false + for _, cmd := range rootCmd.Commands() { + if cmd == keysCmd { + found = true + break + } + } + assert.True(t, found, "keysCmd should be registered with rootCmd") +} diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 542e2335..8293a47f 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -7,6 +7,7 @@ import ( pb "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" "github.com/LumeraProtocol/supernode/pkg/logtrace" cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade" + "google.golang.org/grpc" ) @@ -53,13 +54,14 @@ func (server *CascadeActionServer) Register(stream pb.CascadeService_RegisterSer // Check which type of message we received switch x := req.RequestType.(type) { case *pb.RegisterRequest_Chunk: - // Add data chunk to our collection - allData = append(allData, x.Chunk.Data...) - logtrace.Info(ctx, "received data chunk", logtrace.Fields{ - "chunk_size": len(x.Chunk.Data), - "total_size_so_far": len(allData), - }) - + if x.Chunk != nil { + // Add data chunk to our collection + allData = append(allData, x.Chunk.Data...) + logtrace.Info(ctx, "received data chunk", logtrace.Fields{ + "chunk_size": len(x.Chunk.Data), + "total_size_so_far": len(allData), + }) + } case *pb.RegisterRequest_Metadata: // Store metadata - this should be the final message metadata = x.Metadata @@ -78,21 +80,32 @@ func (server *CascadeActionServer) Register(stream pb.CascadeService_RegisterSer // Process the complete data task := server.service.NewCascadeRegistrationTask() - res, err := task.Register(ctx, &cascadeService.RegisterRequest{ + err := task.Register(ctx, &cascadeService.RegisterRequest{ TaskID: metadata.TaskId, ActionID: metadata.ActionId, Data: allData, + }, func(resp *cascadeService.RegisterResponse) error { + grpcResp := &pb.RegisterResponse{ + EventType: pb.SupernodeEventType(resp.EventType), + Message: resp.Message, + TxHash: resp.TxHash, + } + if err := stream.Send(grpcResp); err != nil { + logtrace.Error(ctx, "failed to send response to client", logtrace.Fields{ + logtrace.FieldError: err.Error(), + }) + return err + } + return nil }) if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to upload input data", fields) - return fmt.Errorf("cascade services upload input data error: %w", err) + logtrace.Error(ctx, "registration task failed", logtrace.Fields{ + logtrace.FieldError: err.Error(), + }) + return fmt.Errorf("registration failed: %w", err) } - // Send the response - return stream.SendMsg(&pb.RegisterResponse{ - Success: res.Success, - Message: res.Message, - }) + logtrace.Info(ctx, "cascade registration completed successfully", fields) + return nil } diff --git a/supernode/node/supernode/client/client.go b/supernode/node/supernode/client/client.go deleted file mode 100644 index 3c1d6a30..00000000 --- a/supernode/node/supernode/client/client.go +++ /dev/null @@ -1,50 +0,0 @@ -package client - -import ( - "context" - "time" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" - netgrpcclient "github.com/LumeraProtocol/supernode/pkg/net/grpc/client" - "github.com/LumeraProtocol/supernode/pkg/random" - node "github.com/LumeraProtocol/supernode/supernode/node/supernode" - "github.com/cosmos/cosmos-sdk/crypto/keyring" - _ "google.golang.org/grpc/keepalive" -) - -// this implements SN's GRPC methods that call another SN during Cascade Registration -// meaning - these methods implements client side of SN to SN GRPC communication - -type Client struct { - *netgrpcclient.Client - KeyRing keyring.Keyring - SuperNodeAccAddress string -} - -// Connect implements node.Client.Connect() -func (c *Client) Connect(ctx context.Context, address string) (node.ConnectionInterface, error) { - clientOptions := netgrpcclient.DefaultClientOptions() - clientOptions.ConnWaitTime = 30 * time.Minute - clientOptions.MinConnectTimeout = 30 * time.Minute - clientOptions.EnableRetries = false - - id, _ := random.String(8, random.Base62Chars) - - grpcConn, err := c.Client.Connect(ctx, address, clientOptions) - if err != nil { - log.WithContext(ctx).WithError(err).Error("DialContext err") - return nil, errors.Errorf("dial address %s: %w", address, err) - } - - log.WithContext(ctx).Debugf("Connected to %s", address) - - conn := newClientConn(id, grpcConn) - - go func() { - //<-conn.Done() - log.WithContext(ctx).Debugf("Disconnected %s", grpcConn.Target()) - }() - - return conn, nil -} diff --git a/supernode/node/supernode/client/connection.go b/supernode/node/supernode/client/connection.go deleted file mode 100644 index 59c7d631..00000000 --- a/supernode/node/supernode/client/connection.go +++ /dev/null @@ -1,21 +0,0 @@ -package client - -import ( - "google.golang.org/grpc" - - "github.com/LumeraProtocol/supernode/supernode/node/supernode" -) - -// clientConn represents grpc client connection. -type clientConn struct { - *grpc.ClientConn - - id string -} - -func newClientConn(id string, conn *grpc.ClientConn) supernode.ConnectionInterface { - return &clientConn{ - ClientConn: conn, - id: id, - } -} diff --git a/supernode/node/supernode/client/session.go b/supernode/node/supernode/client/session.go deleted file mode 100644 index d325e7d6..00000000 --- a/supernode/node/supernode/client/session.go +++ /dev/null @@ -1,13 +0,0 @@ -package client - -import ( - "context" - - "github.com/LumeraProtocol/supernode/proto" - "google.golang.org/grpc/metadata" -) - -func ContextWithMDSessID(ctx context.Context, sessID string) context.Context { - md := metadata.Pairs(proto.MetadataKeySessID, sessID) - return metadata.NewOutgoingContext(ctx, md) -} diff --git a/supernode/node/supernode/node_client_interface.go b/supernode/node/supernode/node_client_interface.go deleted file mode 100644 index 019b56bc..00000000 --- a/supernode/node/supernode/node_client_interface.go +++ /dev/null @@ -1,32 +0,0 @@ -package supernode - -import ( - "context" -) - -// ClientInterface represents a base connection interface. -type ClientInterface interface { - // Connect connects to the server at the given address. - Connect(ctx context.Context, address string) (ConnectionInterface, error) -} - -// ConnectionInterface represents a client connection -type ConnectionInterface interface { - // Close closes connection. - Close() error -} - -// SuperNodePeerAPIInterface base interface for other Node API interfaces -type SuperNodePeerAPIInterface interface { - // SessID returns the taskID received from the server during the handshake. - SessID() (taskID string) - // Session sets up an initial connection with primary supernode, by telling sessID and its own nodeID. - Session(ctx context.Context, nodeID, sessID string) (err error) -} - -// revive:disable:exported - -// NodeMaker interface to make concrete node types -type NodeMaker interface { - MakeNode(conn ConnectionInterface) SuperNodePeerAPIInterface -} diff --git a/supernode/services/cascade/events.go b/supernode/services/cascade/events.go new file mode 100644 index 00000000..6f9239f3 --- /dev/null +++ b/supernode/services/cascade/events.go @@ -0,0 +1,18 @@ +package cascade + +type SupernodeEventType int + +const ( + SupernodeEventTypeUNKNOWN SupernodeEventType = 0 + SupernodeEventTypeActionRetrieved SupernodeEventType = 1 + SupernodeEventTypeActionFeeVerified SupernodeEventType = 2 + SupernodeEventTypeTopSupernodeCheckPassed SupernodeEventType = 3 + SupernodeEventTypeMetadataDecoded SupernodeEventType = 4 + SupernodeEventTypeDataHashVerified SupernodeEventType = 5 + SupernodeEventTypeInputEncoded SupernodeEventType = 6 + SupernodeEventTypeSignatureVerified SupernodeEventType = 7 + SupernodeEventTypeRQIDsGenerated SupernodeEventType = 8 + SupernodeEventTypeRqIDsVerified SupernodeEventType = 9 + SupernodeEventTypeArtefactsStored SupernodeEventType = 10 + SupernodeEventTypeActionFinalized SupernodeEventType = 11 +) diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index ec79f6e4..c044f3bc 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -2,7 +2,6 @@ package cascade import ( "context" - "fmt" "github.com/LumeraProtocol/supernode/pkg/logtrace" ) @@ -16,8 +15,9 @@ type RegisterRequest struct { // RegisterResponse contains the result of upload type RegisterResponse struct { - Success bool - Message string + EventType SupernodeEventType + Message string + TxHash string } // Register processes the upload request for cascade input data. @@ -35,79 +35,106 @@ type RegisterResponse struct { // 7- Generate RQ-ID files from the layout that we generated locally and then match those with the ones in the action // 8- Verify the IDs in the layout and the metadata (the IDs should match the ones in the action) // 9- Store the artefacts in P2P Storage (the redundant metadata files and the symbols from the symbols dir) -func (task *CascadeRegistrationTask) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) { +func (task *CascadeRegistrationTask) Register( + ctx context.Context, + req *RegisterRequest, + send func(resp *RegisterResponse) error, +) error { + fields := logtrace.Fields{logtrace.FieldMethod: "Register", logtrace.FieldRequest: req} + logtrace.Info(ctx, "cascade-action-registration request received", fields) /* 1. Fetch & validate action -------------------------------------------------- */ action, err := task.fetchAction(ctx, req.ActionID, fields) if err != nil { - return nil, err + return err } + fields[logtrace.FieldBlockHeight] = action.BlockHeight + fields[logtrace.FieldCreator] = action.Creator + fields[logtrace.FieldStatus] = action.State + fields[logtrace.FieldPrice] = action.Price + logtrace.Info(ctx, "action has been retrieved", fields) + task.streamEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", "", send) /* 2. Verify action fee -------------------------------------------------------- */ if err := task.verifyActionFee(ctx, action, req.Data, fields); err != nil { - return nil, err + return err } + logtrace.Info(ctx, "action fee has been validated", fields) + task.streamEvent(SupernodeEventTypeActionFeeVerified, "action-fee has been validated", "", send) /* 3. Ensure this super-node is eligible -------------------------------------- */ + fields[logtrace.FieldSupernodeState] = task.config.SupernodeAccountAddress if err := task.ensureIsTopSupernode(ctx, uint64(action.BlockHeight), fields); err != nil { - return nil, err + return err } + logtrace.Info(ctx, "current-supernode exists in the top-sn list", fields) + task.streamEvent(SupernodeEventTypeTopSupernodeCheckPassed, "current supernode exists in the top-sn list", "", send) /* 4. Decode cascade metadata -------------------------------------------------- */ cascadeMeta, err := task.decodeCascadeMetadata(ctx, action.Metadata, fields) if err != nil { - return nil, err + return err } + logtrace.Info(ctx, "cascade metadata decoded", fields) + task.streamEvent(SupernodeEventTypeMetadataDecoded, "cascade metadata has been decoded", "", send) /* 5. Verify data hash --------------------------------------------------------- */ if err := task.verifyDataHash(ctx, req.Data, cascadeMeta.DataHash, fields); err != nil { - return nil, err + return err } + logtrace.Info(ctx, "data-hash has been verified", fields) + task.streamEvent(SupernodeEventTypeDataHashVerified, "data-hash has been verified", "", send) /* 6. Encode the raw data ------------------------------------------------------ */ encResp, err := task.encodeInput(ctx, req.Data, fields) if err != nil { - return nil, err + return err } + logtrace.Info(ctx, "input-data has been encoded", fields) + task.streamEvent(SupernodeEventTypeInputEncoded, "input data has been encoded", "", send) /* 7. Signature verification + layout decode ---------------------------------- */ layout, signature, err := task.verifySignatureAndDecodeLayout( ctx, cascadeMeta.Signatures, action.Creator, encResp.Metadata, fields, ) if err != nil { - return nil, err + return err } + logtrace.Info(ctx, "signature has been verified", fields) + task.streamEvent(SupernodeEventTypeSignatureVerified, "signature has been verified", "", send) /* 8. Generate RQ-ID files ----------------------------------------------------- */ rqidResp, err := task.generateRQIDFiles(ctx, cascadeMeta, signature, action.Creator, encResp.Metadata, fields) if err != nil { - return nil, err + return err } + logtrace.Info(ctx, "rq-id files have been generated", fields) + task.streamEvent(SupernodeEventTypeRQIDsGenerated, "rq-id files have been generated", "", send) /* 9. Consistency checks ------------------------------------------------------- */ if err := verifyIDs(ctx, layout, encResp.Metadata); err != nil { - return nil, task.wrapErr(ctx, "failed to verify IDs", err, fields) + return err } + logtrace.Info(ctx, "rq-ids have been verified", fields) + task.streamEvent(SupernodeEventTypeRqIDsVerified, "rq-ids have been verified", "", send) /* 10. Persist artefacts -------------------------------------------------------- */ if err := task.storeArtefacts(ctx, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil { - return nil, err + return err } logtrace.Info(ctx, "artefacts have been stored", fields) + task.streamEvent(SupernodeEventTypeArtefactsStored, "artefacts have been stored", "", send) resp, err := task.lumeraClient.ActionMsg().FinalizeCascadeAction(ctx, action.ActionID, rqidResp.RQIDs) if err != nil { - logtrace.Info(ctx, "Finalize Action Error", logtrace.Fields{ - "error": err.Error(), - }) - return nil, err + fields[logtrace.FieldError] = err.Error() + logtrace.Info(ctx, "Finalize Action Error", fields) + return task.wrapErr(ctx, "failed to finalize action", err, fields) } + fields[logtrace.FieldTxHash] = resp.TxHash + logtrace.Info(ctx, "action has been finalized", fields) + task.streamEvent(SupernodeEventTypeActionFinalized, "action has been finalized", resp.TxHash, send) - logtrace.Info(ctx, "Finalize Action Response", logtrace.Fields{ - "resp": resp.Code, - "log": resp.TxHash}) - - // Return success when the cascade action is finalized without errors - return &RegisterResponse{Success: true, Message: fmt.Sprintf("successfully uploaded and finalized input data with txID: %s", resp.TxHash)}, nil + return nil } diff --git a/supernode/services/cascade/task.go b/supernode/services/cascade/task.go index fd15ccc9..2cfa5e09 100644 --- a/supernode/services/cascade/task.go +++ b/supernode/services/cascade/task.go @@ -2,7 +2,6 @@ package cascade import ( "context" - "github.com/LumeraProtocol/supernode/pkg/storage/files" "github.com/LumeraProtocol/supernode/supernode/services/common" ) @@ -43,3 +42,13 @@ func NewCascadeRegistrationTask(service *CascadeService) *CascadeRegistrationTas return task } + +func (task *CascadeRegistrationTask) streamEvent(eventType SupernodeEventType, msg, txHash string, send func(resp *RegisterResponse) error) { + _ = send(&RegisterResponse{ + EventType: eventType, + Message: msg, + TxHash: txHash, + }) + + return +} diff --git a/supernode/services/common/network_handler.go b/supernode/services/common/network_handler.go deleted file mode 100644 index 2ff7d1c6..00000000 --- a/supernode/services/common/network_handler.go +++ /dev/null @@ -1,256 +0,0 @@ -package common - -import ( - "context" - "fmt" - "sync" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" - "github.com/LumeraProtocol/supernode/pkg/lumera" - supernode "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" - "github.com/LumeraProtocol/supernode/pkg/types" - node "github.com/LumeraProtocol/supernode/supernode/node/supernode" -) - -// NetworkHandler common functionality related for SNs Mesh and other interconnections -type NetworkHandler struct { - task *SuperNodeTask - lumeraHandler lumera.Client - - nodeMaker node.NodeMaker - NodeClient node.ClientInterface - - acceptedMu sync.Mutex - Accepted SuperNodePeerList - - meshedNodes []types.MeshedSuperNode - // valid only for secondary node - ConnectedTo *SuperNodePeer - - superNodeAccAddress string - minNumberConnectedNodes int -} - -// NewNetworkHandler creates instance of NetworkHandler -func NewNetworkHandler(task *SuperNodeTask, - nodeClient node.ClientInterface, - nodeMaker node.NodeMaker, - lc lumera.Client, - minNumberConnectedNodes int, -) *NetworkHandler { - return &NetworkHandler{ - task: task, - nodeMaker: nodeMaker, - lumeraHandler: lc, - NodeClient: nodeClient, - minNumberConnectedNodes: minNumberConnectedNodes, - } -} - -// MeshedNodes return SupernodeAccountAddresses of meshed nodes -func (h *NetworkHandler) MeshedNodes() []string { - var ids []string - for _, peer := range h.meshedNodes { - ids = append(ids, peer.NodeID) - } - return ids -} - -// Session is handshake wallet to supernode -func (h *NetworkHandler) Session(ctx context.Context, isPrimary bool) error { - if err := h.task.RequiredStatus(StatusTaskStarted); err != nil { - return err - } - - <-h.task.NewAction(func(ctx context.Context) error { - if isPrimary { - log.WithContext(ctx).Debug("Acts as primary node") - h.task.UpdateStatus(StatusPrimaryMode) - return nil - } - - log.WithContext(ctx).Debug("Acts as secondary node") - h.task.UpdateStatus(StatusSecondaryMode) - - return nil - }) - return nil -} - -// AcceptedNodes waits for connection supernodes, as soon as there is the required amount returns them. -func (h *NetworkHandler) AcceptedNodes(serverCtx context.Context) (SuperNodePeerList, error) { - if err := h.task.RequiredStatus(StatusPrimaryMode); err != nil { - return nil, fmt.Errorf("AcceptedNodes: %w", err) - } - - <-h.task.NewAction(func(ctx context.Context) error { - log.WithContext(ctx).Debug("Waiting for supernodes to connect") - - sub := h.task.SubscribeStatus() - for { - select { - case <-serverCtx.Done(): - return nil - case <-ctx.Done(): - return nil - case status := <-sub(): - if status.Is(StatusConnected) { - return nil - } - } - } - }) - return h.Accepted, nil -} - -// SessionNode accepts secondary node -func (h *NetworkHandler) SessionNode(_ context.Context, nodeID string) error { - h.acceptedMu.Lock() - defer h.acceptedMu.Unlock() - - if err := h.task.RequiredStatus(StatusPrimaryMode); err != nil { - return fmt.Errorf("SessionNode: %w", err) - } - - var err error - - <-h.task.NewAction(func(ctx context.Context) error { - if node := h.Accepted.ByID(nodeID); node != nil { - log.WithContext(ctx).WithField("nodeID", nodeID).Errorf("node is already registered") - err = errors.Errorf("node %q is already registered", nodeID) - return nil - } - - var someNode *SuperNodePeer - someNode, err = h.toSupernodePeer(ctx, nodeID) - if err != nil { - log.WithContext(ctx).WithField("nodeID", nodeID).WithError(err).Errorf("get node by extID") - err = errors.Errorf("get node by extID %s: %w", nodeID, err) - return nil - } - h.Accepted.Add(someNode) - - log.WithContext(ctx).WithField("nodeID", nodeID).Debug("Accept secondary node") - - if len(h.Accepted) >= h.minNumberConnectedNodes { - h.task.UpdateStatus(StatusConnected) - } - return nil - }) - return err -} - -// ConnectTo connects to primary node -func (h *NetworkHandler) ConnectTo(_ context.Context, nodeID, sessID string) error { - if err := h.task.RequiredStatus(StatusSecondaryMode); err != nil { - return err - } - - var err error - - <-h.task.NewAction(func(ctx context.Context) error { - var someNode *SuperNodePeer - someNode, err = h.toSupernodePeer(ctx, nodeID) - if err != nil { - log.WithContext(ctx).WithField("nodeID", nodeID).WithError(err).Errorf("get node by extID") - return nil - } - - if err := someNode.Connect(ctx); err != nil { - log.WithContext(ctx).WithField("nodeID", nodeID).WithError(err).Errorf("connect to node") - return nil - } - - if err = someNode.Session(ctx, h.superNodeAccAddress, sessID); err != nil { - log.WithContext(ctx).WithField("sessID", sessID).WithField("sn-acc-address", h.superNodeAccAddress).WithError(err).Errorf("handshake with peer") - return nil - } - - h.ConnectedTo = someNode - h.task.UpdateStatus(StatusConnected) - return nil - }) - return err -} - -// MeshNodes to set info of all meshed supernodes - that will be to send -func (h *NetworkHandler) MeshNodes(_ context.Context, meshedNodes []types.MeshedSuperNode) error { - if err := h.task.RequiredStatus(StatusConnected); err != nil { - return err - } - h.meshedNodes = meshedNodes - - return nil -} - -// CheckNodeInMeshedNodes checks if the node is in the active mesh (by nodeID) -func (h *NetworkHandler) CheckNodeInMeshedNodes(nodeID string) error { - if h.meshedNodes == nil { - return errors.New("nil meshedNodes") - } - - for _, node := range h.meshedNodes { - if node.NodeID == nodeID { - return nil - } - } - - return errors.New("nodeID not found") -} - -// toSupernodePeer returns information about SN by its account-address -func (h *NetworkHandler) toSupernodePeer(ctx context.Context, supernodeAccountAddress string) (*SuperNodePeer, error) { - sn, err := h.lumeraHandler.SuperNode().GetSupernodeBySupernodeAddress(ctx, supernodeAccountAddress) - if err != nil { - return nil, err - } - - supernodeIP, err := supernode.GetLatestIP(sn) - if err != nil { - return nil, err - } - - someNode := NewSuperNode(h.NodeClient, supernodeIP, supernodeAccountAddress, h.nodeMaker) - return someNode, nil -} - -// Connect connects to grpc Server and setup pointer to concrete client wrapper -func (node *SuperNodePeer) Connect(ctx context.Context) error { - connCtx, connCancel := context.WithTimeout(ctx, defaultConnectToNodeTimeout) - defer connCancel() - - conn, err := node.ClientInterface.Connect(connCtx, node.Address) - if err != nil { - return err - } - - node.ConnectionInterface = conn - node.SuperNodePeerAPIInterface = node.MakeNode(conn) - return nil -} - -func (h *NetworkHandler) CloseSNsConnections(ctx context.Context) error { - for _, node := range h.Accepted { - if node.ConnectionInterface != nil { - if err := node.Close(); err != nil { - log.WithContext(ctx).WithError(err).Errorf("close connection to node %s", node.ID) - } - } else { - log.WithContext(ctx).Errorf("node %s has no connection", node.ID) - } - - } - - if h.ConnectedTo != nil { - if err := h.ConnectedTo.Close(); err != nil { - log.WithContext(ctx).WithError(err).Errorf("close connection to node %s", h.ConnectedTo.ID) - } - } - - return nil -} - -func (h *NetworkHandler) IsPrimary() bool { - return h.ConnectedTo == nil -} diff --git a/supernode/services/common/node_peer.go b/supernode/services/common/node_peer.go deleted file mode 100644 index 07ae4f7d..00000000 --- a/supernode/services/common/node_peer.go +++ /dev/null @@ -1,82 +0,0 @@ -package common - -import ( - "time" - - node "github.com/LumeraProtocol/supernode/supernode/node/supernode" -) - -const ( - defaultConnectToNodeTimeout = time.Second * 35 -) - -// SuperNodePeer represents a single supernode -type SuperNodePeer struct { - node.ClientInterface - node.NodeMaker - node.ConnectionInterface - node.SuperNodePeerAPIInterface - - ID string - Address string -} - -//// Connect connects to grpc Server and setup pointer to concrete client wrapper -//func (node *SuperNodePeer) Connect(ctx context.Context) error { -// connCtx, connCancel := context.WithTimeout(ctx, defaultConnectToNodeTimeout) -// defer connCancel() -// -// conn, err := node.ClientInterface.Connect(connCtx, node.Address) -// if err != nil { -// return err -// } -// -// node.ConnectionInterface = conn -// node.SuperNodePeerAPIInterface = node.MakeNode(conn) -// return nil -//} - -// NewSuperNode returns a new Node instance. -func NewSuperNode( - client node.ClientInterface, - address string, nodeAddress string, - nodeMaker node.NodeMaker) *SuperNodePeer { - return &SuperNodePeer{ - ClientInterface: client, - NodeMaker: nodeMaker, - Address: address, - ID: nodeAddress, - } -} - -// SuperNodePeerList represents muptiple SenseRegistrationNodes -type SuperNodePeerList []*SuperNodePeer - -// Add adds a new node to the list -func (list *SuperNodePeerList) Add(node *SuperNodePeer) { - *list = append(*list, node) -} - -// ByID returns a node from the list by the given id. -func (list *SuperNodePeerList) ByID(id string) *SuperNodePeer { - for _, someNode := range *list { - if someNode.ID == id { - return someNode - } - } - return nil -} - -// Remove removes a node from the list by the given id. -func (list *SuperNodePeerList) Remove(id string) { - for i, someNode := range *list { - if someNode.ID == id { - if i+1 < len(*list) { - *list = append((*list)[:i], (*list)[i+1:]...) - } else { - *list = (*list)[:i] - } - break - } - } -}