Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 75 additions & 18 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -193,6 +194,7 @@ type ptpProcess struct {
nodeProfile ptpv1.PtpProfile
logParser parser.MetricsExtractor
pmcCheck bool
clockClassRunning atomic.Bool
lastTransitionResult event.PTPState
clockType event.ClockType
ptpClockThreshold *ptpv1.PtpClockThreshold
Expand Down Expand Up @@ -227,6 +229,23 @@ func (p *ptpProcess) setStopped(val bool) {
p.execMutex.Unlock()
}

// TriggerPmcCheck sets pmcCheck to true in a thread-safe way
func (p *ptpProcess) TriggerPmcCheck() {
p.execMutex.Lock()
p.pmcCheck = true
p.execMutex.Unlock()
}

// ConsumePmcCheck atomically reads and resets the pmcCheck flag.
// It returns true if a PMC check should be performed.
func (p *ptpProcess) ConsumePmcCheck() bool {
p.execMutex.Lock()
val := p.pmcCheck
p.pmcCheck = false
p.execMutex.Unlock()
return val
}

// Daemon is the main structure for linuxptp instance.
// It contains all the necessary data to run linuxptp instance.
type Daemon struct {
Expand Down Expand Up @@ -872,7 +891,7 @@ func (dn *Daemon) HandlePmcTicker() {
if p.name == ptp4lProcessName {
// T-BC has different requirements for PMC polling. Handled in the T-BC event handler.
if p.nodeProfile.PtpSettings["clockType"] != TBC {
p.pmcCheck = true
p.TriggerPmcCheck()
}
}
}
Expand Down Expand Up @@ -919,23 +938,27 @@ func (p *ptpProcess) updateClockClass(c *net.Conn) {
if p.nodeProfile.PtpSettings["clockType"] == TBC || p.nodeProfile.PtpSettings["controllingProfile"] != "" {
return
}
// Per-process single-flight guard
if !p.clockClassRunning.CompareAndSwap(false, true) {
glog.Infof("clock class update already running for %s, skipping this run", p.configName)
return
}
defer p.clockClassRunning.Store(false)
defer func() {
if r := recover(); r != nil {
glog.Errorf("updateClockClass Recovered in f %#v", r)
}
}()

if r, e := pmc.RunPMCExpGetParentDS(p.configName); e == nil {
glog.Infof("%++v", r)

if r.GrandmasterClockClass != p.GrandmasterClockClass {
glog.Infof("clock change event identified: %d -> %d", p.GrandmasterClockClass, r.GrandmasterClockClass)
p.GrandmasterClockClass = r.GrandmasterClockClass
}
//ptp4l[5196819.100]: [ptp4l.0.config] CLOCK_CLASS_CHANGE:248
// change to pint every minute or when the clock class changes
if c == nil {
UpdateClockClassMetrics(float64(p.GrandmasterClockClass)) // no socket then update metrics
UpdateClockClassMetrics(p.name, float64(p.GrandmasterClockClass)) // no socket then update metrics
} else {
clockClassOut := fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", p.name, time.Now().Unix(), p.configName, p.GrandmasterClockClass)
_, err := (*c).Write([]byte(clockClassOut))
Expand Down Expand Up @@ -1018,12 +1041,6 @@ func (p *ptpProcess) cmdRun(stdoutToSocket bool, pm *plugin.PluginManager) {
if p.name == ptp4lProcessName {
if strings.Contains(output, ClockClassChangeIndicator) {
go p.updateClockClass(nil)
} else if p.pmcCheck {
p.pmcCheck = false
go p.updateClockClass(nil)
}
if profileClockType == TBC {
p.tBCTransitionCheck(output, pm)
}
} else if p.name == phc2sysProcessName && len(p.haProfile) > 0 {
p.announceHAFailOver(nil, output) // do not use go routine since order of execution is important here
Expand Down Expand Up @@ -1053,6 +1070,24 @@ func (p *ptpProcess) cmdRun(stdoutToSocket bool, pm *plugin.PluginManager) {
d.ProcessStatus(p.c, PtpProcessUp)
}
}
// moving outside scanner loop to ensure clock class update routine
// even if process hangs
go func() {
for {
select {
case <-p.exitCh:
glog.Infof("Exiting pmcCheck%s...", p.name)
return
default:
if p.ConsumePmcCheck() {
p.updateClockClass(p.c)
}
//Add a small sleep to avoid tight CPU loop
time.Sleep(100 * time.Millisecond)
}
}
}()

for scanner.Scan() {
output := scanner.Text()
if p.name == chronydProcessName {
Expand Down Expand Up @@ -1144,7 +1179,7 @@ func (p *ptpProcess) processPTPMetrics(output string) {
logEntry := synce.ParseLog(output)
p.ProcessSynceEvents(logEntry)
} else {
configName, source, ptpOffset, clockState, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output)
configName, source, ptpOffset, clockState, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output, p.c == nil)
p.hasCollectedMetrics = true
if iface != "" { // for ptp4l/phc2sys this function only update metrics
var values map[event.ValueType]interface{}
Expand All @@ -1166,6 +1201,16 @@ func (p *ptpProcess) processPTPMetrics(output string) {
state = event.PTP_HOLDOVER // consider s1 state as holdover,this passed to event to create metrics and events
}
p.ProcessTs2PhcEvents(ptpOffset, source, ifaceName, state, values)
} else if clockState == HOLDOVER || clockState == LOCKED {
// in case of holdover without iface, still need to update clock class for T_G
if p.name != ts2phcProcessName && p.name != syncEProcessName { // TGM announce clock class via events
p.ConsumePmcCheck() // reset pmc check since we are updating clock class here
// on faulty port or recovery of slave port there might be a clock class change
go func() {
time.Sleep(50 * time.Millisecond)
p.updateClockClass(p.c)
}()
}
}
}
}
Expand All @@ -1185,6 +1230,9 @@ func (p *ptpProcess) cmdStop() {
glog.Infof("%s setStopped true", p.name)

p.setStopped(true)
// reset runtime flags
p.ConsumePmcCheck()
p.clockClassRunning.Store(false)
if p.cmd.Process != nil {
glog.Infof("Sending TERM to (%s) PID: %d", p.name, p.cmd.Process.Pid)
err := p.cmd.Process.Signal(syscall.SIGTERM)
Expand Down Expand Up @@ -1287,13 +1335,19 @@ func (p *ptpProcess) ProcessTs2PhcEvents(ptpOffset float64, source string, iface
if iface != "" && iface != clockRealTime {
iface = utils.GetAlias(iface)
}
if p.c != nil {
return // no metrics when socket is used
}
switch ptpState {
case event.PTP_LOCKED:
updateClockStateMetrics(p.name, iface, LOCKED)
case event.PTP_FREERUN:
updateClockStateMetrics(p.name, iface, FREERUN)
case event.PTP_HOLDOVER:
updateClockStateMetrics(p.name, iface, HOLDOVER)
if p.clockType != TGM { // TGM announce clock class via events
go p.updateClockClass(p.c)
}
}
}
}
Expand Down Expand Up @@ -1543,10 +1597,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
ExtendedSSM: 0,
})
state = sDeviceConfig.LastClockState
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", logEntry.QL)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", synce.QL_DEFAULT_ENHSSM)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(logEntry.QL)+int(synce.QL_DEFAULT_ENHSSM))

if p.c == nil { // only update metrics if no socket is used
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", logEntry.QL)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", synce.QL_DEFAULT_ENHSSM)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(logEntry.QL)+int(synce.QL_DEFAULT_ENHSSM))
}
} else if sDeviceConfig.ExtendedTlv == synce.ExtendedTLV_ENABLED {
var lastQLState *synce.QualityLevelInfo
var ok bool
Expand All @@ -1570,9 +1625,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
ExtendedSSM: lastQLState.ExtendedSSM,
Priority: 0,
})
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", lastQLState.SSM)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", logEntry.ExtQl)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(lastQLState.SSM)+int(logEntry.ExtQl))
if p.c == nil {
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", lastQLState.SSM)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", logEntry.ExtQl)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(lastQLState.SSM)+int(logEntry.ExtQl))
}

state = sDeviceConfig.LastClockState
} else if logEntry.QL != synce.QL_DEFAULT_SSM { //else we have only QL
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (tc *TestCase) cleanupMetrics() {
daemon.FrequencyAdjustment.With(map[string]string{"from": tc.from, "process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
daemon.Delay.With(map[string]string{"from": tc.from, "process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
daemon.ClockState.With(map[string]string{"process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "node": tc.node}).Set(CLEANUP)
daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "config": "ptp4l.0.config", "node": tc.node}).Set(CLEANUP)
daemon.InterfaceRole.With(map[string]string{"process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
}

Expand Down Expand Up @@ -590,7 +590,7 @@ func Test_ProcessPTPMetrics(t *testing.T) {
assert.Equal(tc.expectedClockState, testutil.ToFloat64(clockState), "ClockState does not match\n%s", tc.String())
}
if tc.expectedClockClassMetrics != SKIP {
clockClassMetrics := daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "node": tc.node})
clockClassMetrics := daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "config": "ptp4l.0.config", "node": tc.node})
assert.Equal(tc.expectedClockClassMetrics, testutil.ToFloat64(clockClassMetrics), "ClockClassMetrics does not match\n%s", tc.String())
}
if tc.expectedInterfaceRole != SKIP {
Expand Down
23 changes: 14 additions & 9 deletions pkg/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ var (
Subsystem: PTPSubsystem,
Name: "clock_class",
Help: "6 = Locked, 7 = PRC unlocked in-spec, 52/187 = PRC unlocked out-of-spec, 135 = T-BC holdover in-spec, 165 = T-BC holdover out-of-spec, 248 = Default, 255 = Slave Only Clock",
}, []string{"process", "node"})
}, []string{"process", "node", "config"})

// InterfaceRole metrics to show current interface role
InterfaceRole = prometheus.NewGaugeVec(
Expand Down Expand Up @@ -257,7 +257,7 @@ func updatePTPMetrics(from, process, iface string, ptpOffset, maxPtpOffset, freq
}

// extractMetrics ...
func extractMetrics(messageTag string, processName string, ifaces config.IFaces, output string) (configName, source string, offset float64, state string, iface string) {
func extractMetrics(messageTag string, processName string, ifaces config.IFaces, output string, updateMetrics bool) (configName, source string, offset float64, state string, iface string) {
configName = strings.Replace(strings.Replace(messageTag, "]", "", 1), "[", "", 1)
if configName != "" {
configName = strings.Split(configName, MessageTagSuffixSeperator)[0] // remove any suffix added to the configName
Expand Down Expand Up @@ -288,8 +288,10 @@ func extractMetrics(messageTag string, processName string, ifaces config.IFaces,
if offsetSource == master {
masterOffsetSource.set(configName, processName)
}
updatePTPMetrics(offsetSource, processName, ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay)
updateClockStateMetrics(processName, ifaceName, clockstate)
if updateMetrics {
updatePTPMetrics(offsetSource, processName, ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay)
updateClockStateMetrics(processName, ifaceName, clockstate)
}
}
source = processName
offset = ptpOffset
Expand All @@ -306,11 +308,14 @@ func extractMetrics(messageTag string, processName string, ifaces config.IFaces,
} else if role == FAULTY {
if slaveIface.isFaulty(configName, ifaces[portId-1].Name) &&
masterOffsetSource.get(configName) == ptp4lProcessName {
updatePTPMetrics(master, processName, masterOffsetIface.get(configName).alias, faultyOffset, faultyOffset, 0, 0)
updatePTPMetrics(phc, phc2sysProcessName, clockRealTime, faultyOffset, faultyOffset, 0, 0)
updateClockStateMetrics(processName, masterOffsetIface.get(configName).alias, FREERUN)
if updateMetrics {
updatePTPMetrics(master, processName, masterOffsetIface.get(configName).alias, faultyOffset, faultyOffset, 0, 0)
updatePTPMetrics(phc, phc2sysProcessName, clockRealTime, faultyOffset, faultyOffset, 0, 0)
updateClockStateMetrics(processName, masterOffsetIface.get(configName).alias, FREERUN)
}
masterOffsetIface.set(configName, "")
slaveIface.set(configName, "")
state = HOLDOVER
}
}
}
Expand Down Expand Up @@ -529,9 +534,9 @@ func UpdateInterfaceRoleMetrics(process string, iface string, role ptpPortRole)
}

// UpdateClockClassMetrics ... update clock class metrics
func UpdateClockClassMetrics(clockClass float64) {
func UpdateClockClassMetrics(cfgName string, clockClass float64) {
ClockClassMetrics.With(prometheus.Labels{
"process": ptp4lProcessName, "node": NodeName}).Set(float64(clockClass))
"process": ptp4lProcessName, "config": cfgName, "node": NodeName}).Set(float64(clockClass))
}

func UpdateProcessStatusMetrics(process, cfgName string, status int64) {
Expand Down
19 changes: 12 additions & 7 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,7 @@ func Init(nodeName string, stdOutToSocket bool, socketName string, processChanne
downstreamParentDataSet: &protocol.ParentDataSet{},
},
}
if clockClassMetric != nil {
clockClassMetric.With(prometheus.Labels{
"process": PTP4lProcessName, "node": nodeName}).Set(248)
}

StateRegisterer = NewStateNotifier()
return ptpEvent

Expand Down Expand Up @@ -686,7 +683,9 @@ connect:
if event.WriteToLog && logDataValues != "" {
logOut = append(logOut, logDataValues)
}
e.UpdateClockStateMetrics(event.State, string(event.ProcessName), event.IFace)
if !e.stdoutToSocket {
e.UpdateClockStateMetrics(event.State, string(event.ProcessName), event.IFace)
}
} else {

// Update the in MemData
Expand Down Expand Up @@ -896,6 +895,9 @@ func (e *EventHandler) GetPTPState(source EventSource, cfgName string) PTPState

// UpdateClockStateMetrics ...
func (e *EventHandler) UpdateClockStateMetrics(state PTPState, process, iFace string) {
if e.stdoutToSocket {
return
}
labels := prometheus.Labels{
"process": process, "node": e.nodeName, "iface": iFace}
if state == PTP_LOCKED {
Expand Down Expand Up @@ -990,6 +992,9 @@ func registerMetrics(m *prometheus.GaugeVec) {
}

func (e *EventHandler) unregisterMetrics(configName string, processName string) {
if e.stdoutToSocket {
return // no need to unregister metrics if events are going to socket
}
if data, ok := e.data[configName]; ok {
for _, v := range data {
if string(v.ProcessName) == processName || processName == "" {
Expand Down Expand Up @@ -1056,9 +1061,9 @@ func (e *EventHandler) UpdateClockClass(c net.Conn, clk ClockClassRequest) {
} else {
glog.Errorf("failed to write class change event, connection is nil")
}
} else {
} else if e.clockClassMetric != nil {
e.clockClassMetric.With(prometheus.Labels{
"process": PTP4lProcessName, "node": e.nodeName}).Set(float64(clockClass))
"process": PTP4lProcessName, "config": clk.cfgName, "node": e.nodeName}).Set(float64(clockClass))
}
fmt.Printf("%s", clockClassOut)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/event/event_tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ func (e *EventHandler) announceClockClass(clockClass int, cfgName string, c net.
} else {
glog.Errorf("failed to write class change event, connection is nil")
}
} else {
} else if e.clockClassMetric != nil {
e.clockClassMetric.With(prometheus.Labels{
"process": PTP4lProcessName, "node": e.nodeName}).Set(float64(clockClass))
"process": PTP4lProcessName, "config": cfgName, "node": e.nodeName}).Set(float64(clockClass))
}
glog.Infof("%s", message)
}
Expand Down