Skip to content

Commit

Permalink
big refactor encapsulate the host inventory in its own module
Browse files Browse the repository at this point in the history
Change-Id: If45b7f5dc558c750c10110a2937947ec4b224c30
  • Loading branch information
aojea committed Dec 14, 2024
1 parent e4eabbe commit 31535cc
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 262 deletions.
106 changes: 18 additions & 88 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package driver
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"slices"
Expand All @@ -29,8 +28,7 @@ import (
"github.com/Mellanox/rdmamap"
"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"github.com/vishvananda/netlink"
"golang.org/x/time/rate"
"github.com/google/dranet/pkg/inventory"

resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -82,6 +80,7 @@ type NetworkDriver struct {

podAllocations storage
claimAllocations storage
netdb inventory.DB
}

type Option func(*NetworkDriver)
Expand Down Expand Up @@ -149,6 +148,15 @@ func Start(ctx context.Context, driverName string, kubeClient kubernetes.Interfa
}
}()

// register the host network interfaces
netdb := inventory.New()
go func() {
err = netdb.Run(ctx)
if err != nil {
klog.Infof("Network Device DB failed with error %v", err)
}
}()

// publish available resources
go plugin.PublishResources(ctx)

Expand Down Expand Up @@ -286,98 +294,20 @@ func (np *NetworkDriver) RemovePodSandbox(_ context.Context, pod *api.PodSandbox

func (np *NetworkDriver) PublishResources(ctx context.Context) {
klog.V(2).Infof("Publishing resources")

minInterval := 5 * time.Second
maxInterval := 1 * time.Minute
rateLimiter := rate.NewLimiter(rate.Every(minInterval), 1)
// Resources are published periodically or if there is a netlink notification
// indicating a new interfaces was added or changed
nlChannel := make(chan netlink.LinkUpdate)
doneCh := make(chan struct{})
defer close(doneCh)
if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil {
klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String())
}

// Obtain data that will not change after the startup
getInstanceProperties(ctx)
// TODO: it is not common but may happen in edge cases that the default gateway changes
// revisit once we have more evidence this can be a potential problem or break some use
// cases.
gwInterfaces := getDefaultGwInterfaces()

for {
err := rateLimiter.Wait(ctx)
if err != nil {
klog.Error(err, "unexpected rate limited error trying to get system interfaces")
}

resources := kubeletplugin.Resources{}
ifaces, err := net.Interfaces()
if err != nil {
klog.Error(err, "unexpected error trying to get system interfaces")
}
for _, iface := range ifaces {
klog.V(7).InfoS("Checking network interface", "name", iface.Name)
if gwInterfaces.Has(iface.Name) {
klog.V(4).Infof("iface %s is an uplink interface", iface.Name)
continue
}

// skip loopback interfaces
if iface.Flags&net.FlagLoopback != 0 {
continue
}

// publish this network interface
device, err := netdevToDRAdev(iface.Name)
if err != nil {
klog.V(2).Infof("could not obtain attributes for iface %s : %v", iface.Name, err)
continue
}

resources.Devices = append(resources.Devices, *device)
klog.V(4).Infof("Found following network interfaces %s", iface.Name)
}

// List RDMA devices
rdmaIfaces, err := netlink.RdmaLinkList()
if err != nil {
klog.Error(err, "could not obtain the list of RDMA resources")
}

for _, iface := range rdmaIfaces {
klog.V(7).InfoS("Checking rdma interface", "name", iface.Attrs.Name)
// publish this RDMA interface
device, err := rdmaToDRAdev(iface.Attrs.Name)
if err != nil {
klog.V(2).Infof("could not obtain attributes for iface %s : %v", iface.Attrs.Name, err)
continue
}

resources.Devices = append(resources.Devices, *device)
klog.V(4).Infof("Found following network interfaces %s", iface.Attrs.Name)
}

if len(resources.Devices) > 0 {
select {
case resources := <-np.netdb.GetResources(ctx):
err := np.draPlugin.PublishResources(ctx, resources)
if err != nil {
klog.Error(err, "unexpected error trying to publish resources")
continue
}
case <-ctx.Done():
klog.Error(ctx.Err(), "context canceled")
return
}

select {
// trigger a reconcile
case <-nlChannel:
// drain the channel so we only sync once
for len(nlChannel) > 0 {
<-nlChannel
}
case <-time.After(maxInterval):
}
// poor man rate limit
time.Sleep(3 * time.Second)
}

}

// NodePrepareResources filter the Claim requested for this driver
Expand Down
64 changes: 0 additions & 64 deletions pkg/driver/sriov.go

This file was deleted.

31 changes: 22 additions & 9 deletions pkg/driver/cloud.go → pkg/inventory/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package driver
package inventory

import (
"context"
Expand All @@ -28,9 +28,6 @@ import (
)

var (
// use global variables for the GCE instance information since should be inmutable after set
instance *cloudInstance

// cloud provider specific

// https://cloud.google.com/compute/docs/accelerator-optimized-machines#network-protocol
Expand Down Expand Up @@ -62,20 +59,24 @@ type networkInterface struct {

// getInstanceProperties get the instace properties and stores them in a global variable to be used in discovery
// TODO(aojea) support more cloud providers
func getInstanceProperties(ctx context.Context) {
func getInstanceProperties(ctx context.Context) *cloudInstance {
var err error
var instance *cloudInstance
if metadata.OnGCE() {
// Get google compute instance metadata for network interfaces
// https://cloud.google.com/compute/docs/metadata/predefined-metadata-keys
klog.Infof("running on GCE")
err = getGCEInstance(ctx)
instance, err = getGCEInstance(ctx)
}
if err != nil {
klog.Infof("could not get instance properties: %v", err)
return nil
}
return instance
}

func getGCEInstance(ctx context.Context) error {
func getGCEInstance(ctx context.Context) (*cloudInstance, error) {
var instance *cloudInstance
// metadata server can not be available during startup
err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 15*time.Second, true, func(ctx context.Context) (done bool, err error) {
instanceName, err := metadata.InstanceNameWithContext(ctx)
Expand Down Expand Up @@ -109,7 +110,19 @@ func getGCEInstance(ctx context.Context) error {
return true, nil
})
if err != nil {
return err
return nil, err
}
return instance, nil
}

func cloudNetwork(mac string, instance *cloudInstance) string {
if instance == nil {
return ""
}
for _, cloudInterface := range instance.Interfaces {
if cloudInterface.Mac == mac {
return cloudInterface.Network
}
}
return nil
return ""
}
Loading

0 comments on commit 31535cc

Please sign in to comment.