Skip to content

Commit

Permalink
use kube 1.32
Browse files Browse the repository at this point in the history
Change-Id: Id49de2c9406cc9133a6045b573eb484f956e965c
  • Loading branch information
aojea committed Dec 14, 2024
1 parent 31535cc commit 6bb54c3
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
5 changes: 4 additions & 1 deletion kind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ containerdConfigPatches:
disable = false
nodes:
- role: control-plane
image: kindest/node:v1.32.0
kubeadmConfigPatches:
# Enable the corresponding version of the resource.k8s.io API
- |
Expand All @@ -37,13 +38,15 @@ nodes:
kubeletExtraArgs:
v: "5"
- role: worker
image: kindest/node:v1.32.0
kubeadmConfigPatches:
- |
kind: JoinConfiguration
nodeRegistration:
kubeletExtraArgs:
v: "5"
- role: worker
image: kindest/node:v1.32.0
kubeadmConfigPatches:
- |
kind: JoinConfiguration
Expand All @@ -54,4 +57,4 @@ featureGates:
# Enable the corresponding DRA feature gates
DynamicResourceAllocation: true
runtimeConfig:
api/all : true
api/beta : true
12 changes: 8 additions & 4 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type NetworkDriver struct {

podAllocations storage
claimAllocations storage
netdb inventory.DB
netdb *inventory.DB
}

type Option func(*NetworkDriver)
Expand Down Expand Up @@ -149,9 +149,9 @@ func Start(ctx context.Context, driverName string, kubeClient kubernetes.Interfa
}()

// register the host network interfaces
netdb := inventory.New()
plugin.netdb = inventory.New()
go func() {
err = netdb.Run(ctx)
err = plugin.netdb.Run(ctx)
if err != nil {
klog.Infof("Network Device DB failed with error %v", err)
}
Expand Down Expand Up @@ -296,7 +296,11 @@ func (np *NetworkDriver) PublishResources(ctx context.Context) {
klog.V(2).Infof("Publishing resources")
for {
select {
case resources := <-np.netdb.GetResources(ctx):
case devices := <-np.netdb.GetResources(ctx):
klog.V(4).Infof("Received %d devices", len(devices))
resources := kubeletplugin.Resources{
Devices: devices,
}
err := np.draPlugin.PublishResources(ctx, resources)
if err != nil {
klog.Error(err, "unexpected error trying to publish resources")
Expand Down
20 changes: 10 additions & 10 deletions pkg/inventory/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"golang.org/x/time/rate"
resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
Expand All @@ -53,7 +52,7 @@ type DB struct {
store map[string]resourceapi.Device

rateLimiter *rate.Limiter
notifications chan kubeletplugin.Resources
notifications chan []resourceapi.Device
}

type Device struct {
Expand All @@ -65,7 +64,7 @@ func New() *DB {
return &DB{
rateLimiter: rate.NewLimiter(rate.Every(minInterval), 1),
store: map[string]resourceapi.Device{},
notifications: make(chan kubeletplugin.Resources),
notifications: make(chan []resourceapi.Device),
}
}

Expand Down Expand Up @@ -93,7 +92,7 @@ func (db *DB) Run(ctx context.Context) error {
klog.Error(err, "unexpected rate limited error trying to get system interfaces")
}

resources := kubeletplugin.Resources{}
devices := []resourceapi.Device{}
ifaces, err := net.Interfaces()
if err != nil {
klog.Error(err, "unexpected error trying to get system interfaces")
Expand All @@ -117,7 +116,7 @@ func (db *DB) Run(ctx context.Context) error {
continue
}

resources.Devices = append(resources.Devices, *device)
devices = append(devices, *device)
klog.V(4).Infof("Found following network interface %s", iface.Name)
}

Expand All @@ -136,12 +135,13 @@ func (db *DB) Run(ctx context.Context) error {
continue
}

resources.Devices = append(resources.Devices, *device)
devices = append(devices, *device)
klog.V(4).Infof("Found following rdma interface %s", iface.Attrs.Name)
}

if len(resources.Devices) > 0 {
db.notifications <- resources
klog.V(4).Infof("Found %d devices", len(devices))
if len(devices) > 0 {
db.notifications <- devices
}
select {
// trigger a reconcile
Expand All @@ -157,7 +157,7 @@ func (db *DB) Run(ctx context.Context) error {
}
}

func (db *DB) GetResources(ctx context.Context) <-chan kubeletplugin.Resources {
func (db *DB) GetResources(ctx context.Context) <-chan []resourceapi.Device {
return db.notifications
}

Expand Down Expand Up @@ -187,7 +187,7 @@ func (db *DB) netdevToDRAdev(ifName string) (*resourceapi.Device, error) {

if ips, err := netlink.AddrList(link, netlink.FAMILY_ALL); err == nil && len(ips) > 0 {
// TODO assume only one addres by now
ip := ips[0].String()
ip := ips[0].IP.String()
device.Basic.Attributes["ip"] = resourceapi.DeviceAttribute{StringValue: &ip}
mac := link.Attrs().HardwareAddr.String()
device.Basic.Attributes["mac"] = resourceapi.DeviceAttribute{StringValue: &mac}
Expand Down

0 comments on commit 6bb54c3

Please sign in to comment.