Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@ test-integration:
test-system:
cd tests/system && go test -tags=system_test -v .

gen-lumera-proto:
cd ./proto/lumera/action && protoc --go_out=../../../gen/lumera/action --go-grpc_out=../../../gen/lumera/action --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative action.proto && cd ../../../
cd ./proto/lumera/action && protoc --go_out=../../../gen/lumera/action --go-grpc_out=../../../gen/lumera/action --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative action_service.proto && cd ../../../
cd ./proto/lumera/supernode && protoc --go_out=../../../gen/lumera/supernode --go-grpc_out=../../../gen/lumera/supernode --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative supernode.proto && cd ../../../
cd ./proto/lumera/supernode && protoc --go_out=../../../gen/lumera/supernode --go-grpc_out=../../../gen/lumera/supernode --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative supernode_service.proto && cd ../../../

gen-dupe-detection-proto:
cd ./proto/dupedetection && protoc --go_out=../../gen/dupedetection --go-grpc_out=../../gen/dupedetection --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative dd-server.proto && cd ../../

gen-raptor-q-proto:
cd ./proto/raptorq && protoc --go_out=../../gen/raptorq --go-grpc_out=../../gen/raptorq --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative raptorq.proto && cd ../../
gen-cascade:
protoc \
--proto_path=proto \
--go_out=gen \
--go_opt=paths=source_relative \
--go-grpc_out=gen \
--go-grpc_opt=paths=source_relative \
proto/supernode/action/cascade/service.proto

# Define the paths
SUPERNODE_SRC=supernode/main.go
Expand Down
264 changes: 116 additions & 148 deletions gen/supernode/action/cascade/service.pb.go

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions gen/supernode/action/cascade/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 5 additions & 8 deletions proto/supernode/action/cascade/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ syntax = "proto3";
package cascade;
option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade";

service CascadeService {
rpc UploadInputData (stream UploadInputDataRequest) returns (UploadInputDataResponse);
service CascadeService {
rpc Register (stream RegisterRequest) returns (RegisterResponse);
}

message UploadInputDataRequest {
message RegisterRequest {
oneof request_type {
DataChunk chunk = 1;
Metadata metadata = 2;
Expand All @@ -18,14 +18,11 @@ message DataChunk {
}

message Metadata {
string filename = 1;
string task_id = 1;
string action_id = 2;
string data_hash = 3;
int32 rq_max = 4;
string signed_data = 5;
}

message UploadInputDataResponse {
message RegisterResponse {
bool success = 1;
string message = 2;
}
16 changes: 8 additions & 8 deletions supernode/node/action/server/cascade/cascade_action_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func NewCascadeActionServer(service *cascadeService.CascadeService) *CascadeActi
func (server *CascadeActionServer) Desc() *grpc.ServiceDesc {
return &pb.CascadeService_ServiceDesc
}
func (server *CascadeActionServer) UploadInputData(stream pb.CascadeService_UploadInputDataServer) error {
func (server *CascadeActionServer) Register(stream pb.CascadeService_RegisterServer) error {
fields := logtrace.Fields{
logtrace.FieldMethod: "UploadInputData",
logtrace.FieldMethod: "Register",
logtrace.FieldModule: "CascadeActionServer",
}

Expand All @@ -52,21 +52,20 @@ func (server *CascadeActionServer) UploadInputData(stream pb.CascadeService_Uplo

// Check which type of message we received
switch x := req.RequestType.(type) {
case *pb.UploadInputDataRequest_Chunk:
case *pb.RegisterRequest_Chunk:
// Add data chunk to our collection
allData = append(allData, x.Chunk.Data...)
logtrace.Info(ctx, "received data chunk", logtrace.Fields{
"chunk_size": len(x.Chunk.Data),
"total_size_so_far": len(allData),
})

case *pb.UploadInputDataRequest_Metadata:
case *pb.RegisterRequest_Metadata:
// Store metadata - this should be the final message
metadata = x.Metadata
logtrace.Info(ctx, "received metadata", logtrace.Fields{
"filename": metadata.Filename,
"task_id": metadata.TaskId,
"action_id": metadata.ActionId,
"data_hash": metadata.DataHash,
})
}
}
Expand All @@ -79,7 +78,8 @@ func (server *CascadeActionServer) UploadInputData(stream pb.CascadeService_Uplo

// Process the complete data
task := server.service.NewCascadeRegistrationTask()
res, err := task.UploadInputData(ctx, &cascadeService.UploadInputDataRequest{
res, err := task.Register(ctx, &cascadeService.RegisterRequest{
TaskID: metadata.TaskId,
ActionID: metadata.ActionId,
Data: allData,
})
Expand All @@ -91,7 +91,7 @@ func (server *CascadeActionServer) UploadInputData(stream pb.CascadeService_Uplo
}

// Send the response
return stream.SendMsg(&pb.UploadInputDataResponse{
return stream.SendMsg(&pb.RegisterResponse{
Success: res.Success,
Message: res.Message,
})
Expand Down
7 changes: 3 additions & 4 deletions supernode/services/cascade/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ type GenRQIdentifiersFilesResponse struct {
RQIDs []string
// RedundantMetadataFiles is a list of redundant files that are generated from the Metadata file
RedundantMetadataFiles [][]byte
// Compressed[B64(JSON(layout)).Signature]
B64EncodedMetadataFileWithSignatureCompressed []byte
}

// GenRQIdentifiersFiles generates Redundant Metadata Files and IDs
Expand All @@ -51,13 +49,14 @@ func GenRQIdentifiersFiles(ctx context.Context, req GenRQIdentifiersFilesRequest
encMetadataFileWithSignature := buffer.Bytes()

// Generate the specified number of variant IDs
_, RQIDsFiles, err := GetIDFiles(ctx, encMetadataFileWithSignature, req.IC, req.RqMax)
rqIdIds, rqIDsFiles, err := GetIDFiles(ctx, encMetadataFileWithSignature, req.IC, req.RqMax)
if err != nil {
return resp, errors.Errorf("get ID Files: %w", err)
}

return GenRQIdentifiersFilesResponse{
RedundantMetadataFiles: RQIDsFiles,
RedundantMetadataFiles: rqIDsFiles,
RQIDs: rqIdIds,
}, nil
}

Expand Down
17 changes: 9 additions & 8 deletions supernode/services/cascade/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ import (
"google.golang.org/grpc/status"
)

// UploadInputDataRequest contains parameters for upload request
type UploadInputDataRequest struct {
// RegisterRequest contains parameters for upload request
type RegisterRequest struct {
TaskID string
ActionID string
Data []byte
}

// UploadInputDataResponse contains the result of upload
type UploadInputDataResponse struct {
// RegisterResponse contains the result of upload
type RegisterResponse struct {
Success bool
Message string
}

// UploadInputData processes the upload request for cascade input data
func (task *CascadeRegistrationTask) UploadInputData(ctx context.Context, req *UploadInputDataRequest) (*UploadInputDataResponse, error) {
// Register processes the upload request for cascade input data
func (task *CascadeRegistrationTask) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) {
fields := logtrace.Fields{
logtrace.FieldMethod: "UploadInputData",
logtrace.FieldMethod: "Register",
logtrace.FieldRequest: req,
}

Expand Down Expand Up @@ -163,7 +164,7 @@ func (task *CascadeRegistrationTask) UploadInputData(ctx context.Context, req *U
}
logtrace.Info(ctx, "raptor-q symbols have been stored", fields)

return &UploadInputDataResponse{
return &RegisterResponse{
Success: true,
Message: "successfully uploaded input data",
}, nil
Expand Down