Skip to content

Commit 118acf5

Browse files
committed
[odcshim] Add support for ODC partitioning
1 parent f1d2e74 commit 118acf5

File tree

6 files changed

+657
-187
lines changed

6 files changed

+657
-187
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ require (
3636
github.com/gogo/protobuf v1.3.1
3737
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
3838
github.com/golang/protobuf v1.4.2
39-
github.com/google/uuid v1.1.1
39+
github.com/google/uuid v1.1.1 // indirect
4040
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
4141
github.com/gorilla/websocket v1.4.2 // indirect
4242
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 // indirect

odcshim/occserver/handlers.go

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ import (
3838
"google.golang.org/grpc/status"
3939
)
4040

41-
func handleGetState(ctx context.Context, odcClient *odcclient.RpcClient) (string, error) {
41+
func handleGetState(ctx context.Context, odcClient *odcclient.RpcClient, envId string) (string, error) {
4242
req := &odc.StateRequest{
43+
Partitionid: envId,
4344
Path: "",
4445
Detailed: false,
4546
}
@@ -50,6 +51,10 @@ func handleGetState(ctx context.Context, odcClient *odcclient.RpcClient) (string
5051
rep *odc.StateReply
5152
)
5253

54+
if envId == "" {
55+
return newState, errors.New("cannot proceed with empty environment id")
56+
}
57+
5358
rep, err = odcClient.GetState(ctx, req, grpc.EmptyCallOption{})
5459
if err != nil {
5560
return newState, printGrpcError(err)
@@ -72,17 +77,18 @@ func handleGetState(ctx context.Context, odcClient *odcclient.RpcClient) (string
7277
"odcMsg": rep.Reply.Msg,
7378
"odcStatus": rep.Reply.Status.String(),
7479
"odcExectime": rep.Reply.Exectime,
75-
"odcRunid": rep.Reply.Runid,
80+
"odcRunid": rep.Reply.Partitionid,
7681
"odcSessionid": rep.Reply.Sessionid,
7782
"odcState": rep.Reply.State,
7883
}).
7984
Debug("call to ODC complete")
8085
return stateForOdcState(newState), err
8186
}
8287

83-
func handleStart(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry ) error {
88+
func handleStart(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, envId string) error {
8489
req := &odc.StartRequest{
8590
Request: &odc.StateRequest{
91+
Partitionid: envId,
8692
Path: "",
8793
Detailed: false,
8894
},
@@ -91,6 +97,10 @@ func handleStart(ctx context.Context, odcClient *odcclient.RpcClient, arguments
9197
var err error = nil
9298
var rep *odc.StateReply
9399

100+
if envId == "" {
101+
return errors.New("cannot proceed with empty environment id")
102+
}
103+
94104
rep, err = odcClient.Start(ctx, req, grpc.EmptyCallOption{})
95105
if err != nil {
96106
return printGrpcError(err)
@@ -111,16 +121,17 @@ func handleStart(ctx context.Context, odcClient *odcclient.RpcClient, arguments
111121
"odcMsg": rep.Reply.Msg,
112122
"odcStatus": rep.Reply.Status.String(),
113123
"odcExectime": rep.Reply.Exectime,
114-
"odcRunid": rep.Reply.Runid,
124+
"odcRunid": rep.Reply.Partitionid,
115125
"odcSessionid": rep.Reply.Sessionid,
116126
}).
117127
Debug("call to ODC complete")
118128
return err
119129
}
120130

121-
func handleStop(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry ) error {
131+
func handleStop(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, envId string) error {
122132
req := &odc.StopRequest{
123133
Request: &odc.StateRequest{
134+
Partitionid: envId,
124135
Path: "",
125136
Detailed: false,
126137
},
@@ -129,6 +140,10 @@ func handleStop(ctx context.Context, odcClient *odcclient.RpcClient, arguments [
129140
var err error = nil
130141
var rep *odc.StateReply
131142

143+
if envId == "" {
144+
return errors.New("cannot proceed with empty environment id")
145+
}
146+
132147
rep, err = odcClient.Stop(ctx, req, grpc.EmptyCallOption{})
133148
if err != nil {
134149
return printGrpcError(err)
@@ -149,16 +164,40 @@ func handleStop(ctx context.Context, odcClient *odcclient.RpcClient, arguments [
149164
"odcMsg": rep.Reply.Msg,
150165
"odcStatus": rep.Reply.Status.String(),
151166
"odcExectime": rep.Reply.Exectime,
152-
"odcRunid": rep.Reply.Runid,
167+
"odcRunid": rep.Reply.Partitionid,
153168
"odcSessionid": rep.Reply.Sessionid,
154169
}).
155170
Debug("call to ODC complete")
156171
return err
157172
}
158173

159-
func handleReset(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry ) error {
174+
func handleReset(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, envId string) error {
175+
if envId == "" {
176+
return errors.New("cannot proceed with empty environment id")
177+
}
178+
179+
err := doReset(ctx, odcClient, arguments, envId)
180+
if err != nil {
181+
return printGrpcError(err)
182+
}
183+
184+
err = doTerminate(ctx, odcClient, arguments, envId)
185+
if err != nil {
186+
return printGrpcError(err)
187+
}
188+
189+
err = doShutdown(ctx, odcClient, arguments, envId)
190+
if err != nil {
191+
return printGrpcError(err)
192+
}
193+
return nil
194+
}
195+
196+
func doReset(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, envId string) error {
197+
// RESET
160198
req := &odc.ResetRequest{
161199
Request: &odc.StateRequest{
200+
Partitionid: envId,
162201
Path: "",
163202
Detailed: false,
164203
},
@@ -187,17 +226,18 @@ func handleReset(ctx context.Context, odcClient *odcclient.RpcClient, arguments
187226
"odcMsg": rep.Reply.Msg,
188227
"odcStatus": rep.Reply.Status.String(),
189228
"odcExectime": rep.Reply.Exectime,
190-
"odcRunid": rep.Reply.Runid,
229+
"odcRunid": rep.Reply.Partitionid,
191230
"odcSessionid": rep.Reply.Sessionid,
192231
}).
193232
Debug("call to ODC complete")
194233
return err
195234
}
196235

197-
func handleExit(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry ) error {
236+
func doTerminate(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, envId string) error {
198237
// TERMINATE
199238
req := &odc.TerminateRequest{
200239
Request: &odc.StateRequest{
240+
Partitionid: envId,
201241
Path: "",
202242
Detailed: false,
203243
},
@@ -226,14 +266,20 @@ func handleExit(ctx context.Context, odcClient *odcclient.RpcClient, arguments [
226266
"odcMsg": rep.Reply.Msg,
227267
"odcStatus": rep.Reply.Status.String(),
228268
"odcExectime": rep.Reply.Exectime,
229-
"odcRunid": rep.Reply.Runid,
269+
"odcRunid": rep.Reply.Partitionid,
230270
"odcSessionid": rep.Reply.Sessionid,
231271
}).
232272
Debug("call to ODC complete")
273+
return err
274+
}
233275

276+
func doShutdown(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, envId string) error{
234277
// SHUTDOWN
235-
shutdownRequest := &odc.ShutdownRequest{}
278+
shutdownRequest := &odc.ShutdownRequest{
279+
Partitionid: envId,
280+
}
236281

282+
var err error = nil
237283
var shutdownResponse *odc.GeneralReply
238284
shutdownResponse, err = odcClient.Shutdown(ctx, shutdownRequest, grpc.EmptyCallOption{})
239285
if err != nil {
@@ -252,17 +298,25 @@ func handleExit(ctx context.Context, odcClient *odcclient.RpcClient, arguments [
252298
return fmt.Errorf("status %s from ODC", replyStatus.String())
253299
}
254300
log.WithFields(logrus.Fields{
255-
"odcMsg": shutdownResponse.Msg,
256-
"odcStatus": shutdownResponse.Status.String(),
257-
"odcExectime": shutdownResponse.Exectime,
258-
"odcRunid": shutdownResponse.Runid,
259-
"odcSessionid": shutdownResponse.Sessionid,
260-
}).
301+
"odcMsg": shutdownResponse.Msg,
302+
"odcStatus": shutdownResponse.Status.String(),
303+
"odcExectime": shutdownResponse.Exectime,
304+
"odcRunid": shutdownResponse.Partitionid,
305+
"odcSessionid": shutdownResponse.Sessionid,
306+
}).
261307
Debug("call to ODC complete")
262308
return err
263309
}
264310

265-
func handleRun(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry ) error {
311+
func handleExit(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry ) error {
312+
return nil
313+
}
314+
315+
func handleRun(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, envId string) error {
316+
if envId == "" {
317+
return errors.New("cannot proceed with empty environment id")
318+
}
319+
266320
log.Trace("BEGIN handleRun")
267321
defer log.Trace("END handleRun")
268322
// RUN request, includes INITIALIZE+SUBMIT+ACTIVATE
@@ -278,6 +332,7 @@ func handleRun(ctx context.Context, odcClient *odcclient.RpcClient, arguments []
278332
}
279333

280334
runRequest := &odc.RunRequest{
335+
Partitionid: envId,
281336
Topology: topology,
282337
}
283338

@@ -304,17 +359,24 @@ func handleRun(ctx context.Context, odcClient *odcclient.RpcClient, arguments []
304359
"odcMsg": runResponse.Msg,
305360
"odcStatus": runResponse.Status.String(),
306361
"odcExectime": runResponse.Exectime,
307-
"odcRunid": runResponse.Runid,
362+
"odcRunid": runResponse.Partitionid,
308363
"odcSessionid": runResponse.Sessionid,
309364
}).
310365
Debug("call to ODC complete")
311366
return err
312367
}
313368

314369

315-
func handleConfigure(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry ) error {
370+
func handleConfigure(ctx context.Context, odcClient *odcclient.RpcClient, arguments []*pb.ConfigEntry, topology string, envId string) error {
371+
if envId == "" {
372+
return errors.New("cannot proceed with empty environment id")
373+
}
374+
375+
var err error = nil
376+
316377
// SetProperties before CONFIGURE
317378
setPropertiesRequest := &odc.SetPropertiesRequest{
379+
Partitionid: envId,
318380
Path: "",
319381
Properties: make([]*odc.Property, len(arguments)),
320382
}
@@ -328,7 +390,15 @@ func handleConfigure(ctx context.Context, odcClient *odcclient.RpcClient, argume
328390
}
329391
}
330392

331-
var err error = nil
393+
err = handleRun(ctx, odcClient, []*pb.ConfigEntry{{
394+
Key: "topology",
395+
Value: topology,
396+
}}, envId)
397+
if err != nil {
398+
return printGrpcError(err)
399+
}
400+
401+
332402
var setPropertiesResponse *odc.GeneralReply
333403
setPropertiesResponse, err = odcClient.SetProperties(ctx, setPropertiesRequest, grpc.EmptyCallOption{})
334404
if err != nil {
@@ -350,7 +420,7 @@ func handleConfigure(ctx context.Context, odcClient *odcclient.RpcClient, argume
350420
"odcMsg": setPropertiesResponse.Msg,
351421
"odcStatus": setPropertiesResponse.Status.String(),
352422
"odcExectime": setPropertiesResponse.Exectime,
353-
"odcRunid": setPropertiesResponse.Runid,
423+
"odcRunid": setPropertiesResponse.Partitionid,
354424
"odcSessionid": setPropertiesResponse.Sessionid,
355425
}).
356426
Debug("call to ODC complete")
@@ -359,6 +429,7 @@ func handleConfigure(ctx context.Context, odcClient *odcclient.RpcClient, argume
359429
// CONFIGURE
360430
configureRequest := &odc.ConfigureRequest{
361431
Request: &odc.StateRequest{
432+
Partitionid: envId,
362433
Path: "",
363434
Detailed: false,
364435
},
@@ -385,7 +456,7 @@ func handleConfigure(ctx context.Context, odcClient *odcclient.RpcClient, argume
385456
"odcMsg": configureResponse.Reply.Msg,
386457
"odcStatus": configureResponse.Reply.Status.String(),
387458
"odcExectime": configureResponse.Reply.Exectime,
388-
"odcRunid": configureResponse.Reply.Runid,
459+
"odcRunid": configureResponse.Reply.Partitionid,
389460
"odcSessionid": configureResponse.Reply.Sessionid,
390461
}).
391462
Debug("call to ODC complete")

odcshim/occserver/occserver.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ var log = logger.New(logrus.StandardLogger(), "o2-aliecs-odc-shim")
4949
const CALL_TIMEOUT = 30*time.Second
5050

5151
type OccServerImpl struct {
52-
odcHost string
53-
odcPort int
54-
topology string
55-
odcClient *odcclient.RpcClient
52+
odcHost string
53+
odcPort int
54+
topology string
55+
environmentId string
56+
odcClient *odcclient.RpcClient
5657
}
5758

5859
func (s *OccServerImpl) disconnectAndTerminate() {
@@ -85,11 +86,7 @@ func (s *OccServerImpl) ensureClientConnected() error {
8586
return fmt.Errorf("cannot dial ODC endpoint: %s", endpoint)
8687
}
8788

88-
err := handleRun(cxt, s.odcClient, []*pb.ConfigEntry{{
89-
Key: "topology",
90-
Value: s.topology,
91-
}})
92-
return err
89+
return nil
9390
}
9491

9592
func NewServer(host string, port int, topology string) *grpc.Server {
@@ -167,7 +164,7 @@ func (s *OccServerImpl) GetState(ctx context.Context, req *pb.GetStateRequest) (
167164
State: "UNKNOWN",
168165
}
169166

170-
newState, err := handleGetState(ctx, s.odcClient)
167+
newState, err := handleGetState(ctx, s.odcClient, s.environmentId)
171168
if err == nil {
172169
rep.State = newState
173170
}
@@ -204,26 +201,34 @@ func (s *OccServerImpl) Transition(ctx context.Context, req *pb.TransitionReques
204201
var err error = nil
205202
switch event := strings.ToUpper(req.TransitionEvent); event {
206203
case "CONFIGURE":
207-
err = handleConfigure(ctx, s.odcClient, req.Arguments)
204+
// Extract environment ID from Arguments payload
205+
for _, entry := range req.Arguments {
206+
if entry.Key == "environment_id" {
207+
s.environmentId = entry.Value
208+
}
209+
}
210+
211+
err = handleConfigure(ctx, s.odcClient, req.Arguments, s.topology, s.environmentId)
208212
if err == nil {
209213
rep.Ok = true
210214
rep.State = "CONFIGURED"
211215
}
212216
case "START":
213-
err = handleStart(ctx, s.odcClient, req.Arguments)
217+
err = handleStart(ctx, s.odcClient, req.Arguments, s.environmentId)
214218
if err == nil {
215219
rep.Ok = true
216220
rep.State = "RUNNING"
217221
}
218222
case "STOP":
219-
err = handleStop(ctx, s.odcClient, req.Arguments)
223+
err = handleStop(ctx, s.odcClient, req.Arguments, s.environmentId)
220224
if err == nil {
221225
rep.Ok = true
222226
rep.State = "CONFIGURED"
223227
}
224228
case "RESET":
225-
err = handleReset(ctx, s.odcClient, req.Arguments)
229+
err = handleReset(ctx, s.odcClient, req.Arguments, s.environmentId)
226230
if err == nil {
231+
s.environmentId = ""
227232
rep.Ok = true
228233
rep.State = "STANDBY"
229234
}

0 commit comments

Comments
 (0)