Skip to content

Commit 8c35e14

Browse files
committed
[core] Allow templating of FairMQ channel rateLogging
1 parent aab5b8f commit 8c35e14

File tree

4 files changed

+41
-43
lines changed

4 files changed

+41
-43
lines changed

core/task/channel/channel.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type Channel struct {
4040
Type ChannelType `yaml:"type"`
4141
SndBufSize int `yaml:"sndBufSize"`
4242
RcvBufSize int `yaml:"rcvBufSize"`
43-
RateLogging int `yaml:"rateLogging"`
43+
RateLogging string `yaml:"rateLogging"`//actually an int but we allow templating
4444
Transport TransportType `yaml:"transport"` //default: default
4545
}
4646

@@ -82,10 +82,7 @@ func (c *Channel) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
8282
if err != nil {
8383
return
8484
}
85-
c.RateLogging, err = strconv.Atoi(aux.RateLogging)
86-
if err != nil {
87-
return
88-
}
85+
c.RateLogging = aux.RateLogging
8986
c.Transport = aux.Transport
9087

9188
return

core/task/channel/inbound.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (inbound Inbound) MarshalYAML() (interface{}, error) {
6767
Type ChannelType `yaml:"type"`
6868
SndBufSize int `yaml:"sndBufSize,omitempty"`
6969
RcvBufSize int `yaml:"rcvBufSize,omitempty"`
70-
RateLogging int `yaml:"rateLogging,omitempty"`
70+
RateLogging string `yaml:"rateLogging,omitempty"`
7171
Transport TransportType `yaml:"transport"`
7272
Addressing AddressFormat `yaml:"addressing"`
7373
}
@@ -114,7 +114,7 @@ func (inbound *Inbound) buildFMQMap(address string, transport TransportType) (pm
114114
chanProps := controlcommands.PropertyMap{
115115
"address": address,
116116
"method": "bind",
117-
"rateLogging": strconv.Itoa(inbound.RateLogging),
117+
"rateLogging": inbound.RateLogging,
118118
"rcvBufSize": strconv.Itoa(inbound.RcvBufSize),
119119
"rcvKernelSize": "0", //NOTE: hardcoded
120120
"sndBufSize": strconv.Itoa(inbound.SndBufSize),

core/task/channel/outbound.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (outbound Outbound) MarshalYAML() (interface{}, error) {
6565
Type ChannelType `yaml:"type"`
6666
SndBufSize int `yaml:"sndBufSize,omitempty"`
6767
RcvBufSize int `yaml:"rcvBufSize,omitempty"`
68-
RateLogging int `yaml:"rateLogging,omitempty"`
68+
RateLogging string `yaml:"rateLogging,omitempty"`
6969
Transport TransportType `yaml:"transport"`
7070
Target string `yaml:"target"`
7171
}
@@ -141,7 +141,7 @@ func (outbound *Outbound) buildFMQMap(address string, transport TransportType) (
141141
chanProps := controlcommands.PropertyMap {
142142
"address": address,
143143
"method": "connect",
144-
"rateLogging": strconv.Itoa(outbound.RateLogging),
144+
"rateLogging": outbound.RateLogging,
145145
"rcvBufSize": strconv.Itoa(outbound.RcvBufSize),
146146
"rcvKernelSize": "0", //NOTE: hardcoded
147147
"sndBufSize": strconv.Itoa(outbound.SndBufSize),

core/task/task.go

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,41 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
469469
propMap[k] = v
470470
}
471471

472+
// For FAIRMQ tasks, we append FairMQ channel configuration
473+
if class.Control.Mode == controlmode.FAIRMQ ||
474+
class.Control.Mode == controlmode.DIRECT {
475+
for _, inbCh := range channel.MergeInbound(t.GetParent().CollectInboundChannels(), class.Bind) {
476+
endpoint, ok := t.localBindMap[inbCh.Name]
477+
if !ok {
478+
log.WithFields(logrus.Fields{
479+
"channelName": inbCh.Name,
480+
"taskName": t.name,
481+
}).
482+
Error("endpoint not allocated for inbound channel")
483+
continue
484+
}
485+
486+
// We get the FairMQ-formatted propertyMap from the inbound channel spec
487+
chanProps := inbCh.ToFMQMap(endpoint)
488+
489+
// And we copy it into the task's propertyMap
490+
for k, v := range chanProps {
491+
propMap[k] = v
492+
}
493+
}
494+
for _, outboundCh := range channel.MergeOutbound(t.GetParent().CollectOutboundChannels(), class.Connect) {
495+
// We get the FairMQ-formatted propertyMap from the outbound channel spec
496+
chanProps := outboundCh.ToFMQMap(bindMap)
497+
498+
// And if valid, we copy it into the task's propertyMap
499+
if len(chanProps) > 0 {
500+
for k, v := range chanProps {
501+
propMap[k] = v
502+
}
503+
}
504+
}
505+
} // end append FairMQ configuration
506+
472507
objStack := make(map[string]interface{})
473508
objStack["ToPtree"] = template.MakeToPtreeFunc(varStack, propMap)
474509

@@ -503,40 +538,6 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
503538
}
504539
}
505540

506-
// For FAIRMQ tasks, we append FairMQ channel configuration
507-
if class.Control.Mode == controlmode.FAIRMQ ||
508-
class.Control.Mode == controlmode.DIRECT {
509-
for _, inbCh := range channel.MergeInbound(t.GetParent().CollectInboundChannels(), class.Bind) {
510-
endpoint, ok := t.localBindMap[inbCh.Name]
511-
if !ok {
512-
log.WithFields(logrus.Fields{
513-
"channelName": inbCh.Name,
514-
"taskName": t.name,
515-
}).
516-
Error("endpoint not allocated for inbound channel")
517-
continue
518-
}
519-
520-
// We get the FairMQ-formatted propertyMap from the inbound channel spec
521-
chanProps := inbCh.ToFMQMap(endpoint)
522-
523-
// And we copy it into the task's propertyMap
524-
for k, v := range chanProps {
525-
propMap[k] = v
526-
}
527-
}
528-
for _, outboundCh := range channel.MergeOutbound(t.GetParent().CollectOutboundChannels(), class.Connect) {
529-
// We get the FairMQ-formatted propertyMap from the outbound channel spec
530-
chanProps := outboundCh.ToFMQMap(bindMap)
531-
532-
// And if valid, we copy it into the task's propertyMap
533-
if len(chanProps) > 0 {
534-
for k, v := range chanProps {
535-
propMap[k] = v
536-
}
537-
}
538-
}
539-
}
540541
}
541542
return propMap
542543
}

0 commit comments

Comments
 (0)