Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions internal/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ type nvidiaDevicePlugin struct {

socket string
server *grpc.Server
health chan *rm.Device
stop chan interface{}

// Health monitoring
healthProvider rm.HealthProvider
cancel context.CancelFunc

imexChannels imex.Channels

Expand Down Expand Up @@ -90,11 +92,9 @@ func (o *options) devicePluginForResource(ctx context.Context, resourceManager r
mps: mpsOptions,

socket: getPluginSocketPath(resourceManager.Resource()),
// These will be reinitialized every
// time the plugin server is restarted.
// server and healthProvider will be reinitialized every time
// the plugin server is restarted.
server: nil,
health: nil,
stop: nil,
}
return &plugin, nil
}
Expand All @@ -108,15 +108,11 @@ func getPluginSocketPath(resource spec.ResourceName) string {

func (plugin *nvidiaDevicePlugin) initialize() {
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
plugin.health = make(chan *rm.Device)
plugin.stop = make(chan interface{})
}

func (plugin *nvidiaDevicePlugin) cleanup() {
close(plugin.stop)
plugin.server = nil
plugin.health = nil
plugin.stop = nil
plugin.healthProvider = nil
}

// Devices returns the full set of devices associated with the plugin.
Expand Down Expand Up @@ -148,13 +144,14 @@ func (plugin *nvidiaDevicePlugin) Start(kubeletSocket string) error {
}
klog.Infof("Registered device plugin for '%s' with Kubelet", plugin.rm.Resource())

go func() {
// TODO: add MPS health check
err := plugin.rm.CheckHealth(plugin.stop, plugin.health)
if err != nil {
klog.Errorf("Failed to start health check: %v; continuing with health checks disabled", err)
}
}()
// Initialize and start health provider
plugin.ctx, plugin.cancel = context.WithCancel(context.Background())
plugin.healthProvider = plugin.rm.HealthProvider()

// TODO: add MPS health check
if err := plugin.healthProvider.Start(plugin.ctx); err != nil {
klog.Errorf("Failed to start health provider: %v; continuing with health checks disabled", err)
}

return nil
}
Expand All @@ -164,6 +161,13 @@ func (plugin *nvidiaDevicePlugin) Stop() error {
if plugin == nil || plugin.server == nil {
return nil
}

// Stop health monitoring
plugin.healthProvider.Stop()
if plugin.cancel != nil {
plugin.cancel()
}

klog.Infof("Stopping to serve '%s' on %s", plugin.rm.Resource(), plugin.socket)
plugin.server.Stop()
if err := os.Remove(plugin.socket); err != nil && !os.IsNotExist(err) {
Expand Down Expand Up @@ -271,11 +275,11 @@ func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.D

for {
select {
case <-plugin.stop:
case <-plugin.ctx.Done():
return nil
case d := <-plugin.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
case d := <-plugin.healthProvider.Health():
// Device became unhealthy
// Device.Health already set to Unhealthy by health provider
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
return nil
Expand Down
Loading