Skip to content

Commit 0566412

Browse files
committed
[executor] Support switching between Protobuf and JSON transport
1 parent 77b59ab commit 0566412

File tree

3 files changed

+24
-6
lines changed

3 files changed

+24
-6
lines changed

executor/executable/controllabletask.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"time"
3434

3535
"github.com/AliceO2Group/Control/common/event"
36+
"github.com/AliceO2Group/Control/common/utils"
3637
"github.com/AliceO2Group/Control/core/controlcommands"
3738
"github.com/AliceO2Group/Control/executor/executorcmd"
3839
pb "github.com/AliceO2Group/Control/executor/protos"
@@ -94,7 +95,12 @@ func (t *ControllableTask) Launch() error {
9495
"id": t.ti.TaskID.Value,
9596
}).
9697
Debug("starting gRPC client")
97-
t.rpc = executorcmd.NewClient(t.tci.ControlPort, t.tci.ControlMode)
98+
99+
controlTransport := executorcmd.ProtobufTransport
100+
if utils.StringSliceContains(taskCmd.Args, "OCClite") {
101+
controlTransport = executorcmd.JsonTransport
102+
}
103+
t.rpc = executorcmd.NewClient(t.tci.ControlPort, t.tci.ControlMode, controlTransport)
98104
if t.rpc == nil {
99105
return errors.New("could not start gRPC client")
100106
}

executor/executorcmd/client.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,18 @@ import (
4848

4949
var log = logger.New(logrus.StandardLogger(), "executorcmd")
5050

51-
func NewClient(controlPort uint64, controlMode controlmode.ControlMode) *RpcClient {
51+
type ControlTransport uint32
52+
const (
53+
ProtobufTransport = ControlTransport(0)
54+
JsonTransport = ControlTransport(1)
55+
)
56+
57+
func NewClient(controlPort uint64, controlMode controlmode.ControlMode, controlTransport ControlTransport) *RpcClient {
5258
endpoint := fmt.Sprintf("127.0.0.1:%d", controlPort)
5359
log.WithField("endpoint", endpoint).Debug("starting new gRPC client")
5460

5561
cxt, cancel := context.WithTimeout(context.Background(), 20*time.Second)
56-
conn, err := grpc.DialContext(cxt, endpoint, grpc.WithInsecure())
62+
conn, err := grpc.DialContext(cxt, endpoint, grpc.WithInsecure(), grpc.WithBlock())
5763
if err != nil {
5864
log.WithField("error", err.Error()).
5965
WithField("endpoint", endpoint).
@@ -62,9 +68,15 @@ func NewClient(controlPort uint64, controlMode controlmode.ControlMode) *RpcClie
6268
return nil
6369
}
6470

71+
var occClient pb.OccClient
72+
if controlTransport == JsonTransport {
73+
occClient = nopb.NewOccClient(conn)
74+
} else {
75+
occClient = pb.NewOccClient(conn)
76+
}
77+
6578
client := &RpcClient {
66-
//OccClient: pb.NewOccClient(conn),
67-
OccClient: nopb.NewOccClient(conn),
79+
OccClient: occClient,
6880
conn: conn,
6981
}
7082

occ/peanut/peanut.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func Run(cmdString string) (err error) {
280280
// Setup RPC
281281
go func() {
282282
// FIXME allow choice of controlmode.FAIRMQ
283-
rpcClient = executorcmd.NewClient(occPort, controlmode.DIRECT)
283+
rpcClient = executorcmd.NewClient(occPort, controlmode.DIRECT, executorcmd.ProtobufTransport)
284284
var response *pb.GetStateReply
285285
response, err = rpcClient.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{})
286286
if err != nil {

0 commit comments

Comments
 (0)