Skip to content

Commit

Permalink
[CNIServer] Remove stale OVS interfaces in reconcile if the original …
Browse files Browse the repository at this point in the history
…Pod interface is disconnected

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Jan 14, 2025
1 parent 5ee28ec commit 82a29d6
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 54 deletions.
88 changes: 48 additions & 40 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,51 +460,59 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
namespace := containerConfig.PodNamespace
name := containerConfig.PodName
namespacedName := k8s.NamespacedName(namespace, name)
if desiredPods.Has(namespacedName) {
// Find the OVS ports which are not connected to host interfaces. This is useful on Windows if the runtime is
// containerd, because the host interface is created async from the OVS port.
if containerConfig.OFPort == -1 {
missingIfConfigs = append(missingIfConfigs, containerConfig)
continue
}
podWg.Add(1)
go func(containerID, pod, namespace string) {
defer podWg.Done()
// Do not install Pod flows until all preconditions are met.
podNetworkWait.Wait()
// To avoid race condition with CNIServer CNI event handlers.
containerAccess.lockContainer(containerID)
defer containerAccess.unlockContainer(containerID)

containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID)
if !exists {
klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, name), "containerID", containerID)
return
}
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name))
}
}(containerConfig.ContainerID, name, namespace)
} else {
// clean-up and delete interface

// Find the OVS ports corresponding to Pods which no longer exist. This includes the case that the Pod using the
// Namespaced name constructed from OVSDB does not exist in kube-apiserver, and the case that a Pod with the
// same Namespaced name exists in kube-apiserver but the host interface is disconnected.
// Note: a Pod's host interface is generated by both Pod name and the sandbox container ID, so the new Pod with
// the same name would have a different host interface associated to it.
if !desiredPods.Has(namespacedName) || pc.isInterfaceInvalid(containerConfig) {
klog.V(4).InfoS("Deleting interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.removeInterfaces(containerConfig.ContainerID); err != nil {
klog.ErrorS(err, "Failed to delete interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
}
// interface should no longer be in store after the call to removeInterfaces
continue
}

// This should only happen on Windows Nodes because if this condition (OFPort is -1) is satisfied on Linux,
// the call to isInterfaceInvalid above will return true, the interface will be removed, and this code will
// not be executed. On Windows, we may create an OVS port but not connect to the host interface when agent
// is down, so OVS has no chance to allocate a valid OpenFlow port to the interface.
if containerConfig.OFPort == -1 {
missingIfConfigs = append(missingIfConfigs, containerConfig)
continue
}

podWg.Add(1)
go func(containerID, pod, namespace string) {
defer podWg.Done()
// Do not install Pod flows until all preconditions are met.
podNetworkWait.Wait()
// To avoid race condition with CNIServer CNI event handlers.
containerAccess.lockContainer(containerID)
defer containerAccess.unlockContainer(containerID)

containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID)
if !exists {
klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, name), "containerID", containerID)
return
}
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name))
}
}(containerConfig.ContainerID, name, namespace)
}
go func() {
defer flowRestoreCompleteWait.Done()
Expand Down
19 changes: 6 additions & 13 deletions pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,13 @@ func (pc *podConfigurator) configureInterfaces(
containerIFDev, mtu, sriovVFDeviceID, result, containerAccess)
}

// reconcileMissingPods is never called on Linux, see reconcile logic.
func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) {
for i := range ifConfigs {
// This should not happen since OVSDB is persisted on the Node.
// TODO: is there anything else we should be doing? Assuming that the Pod's
// interface still exists, we can repair the interface store since we can
// retrieve the name of the host interface for the Pod by calling
// GenerateContainerInterfaceName. One thing we would not be able to
// retrieve is the container ID which is part of the container configuration
// we store in the cache, but this ID is not used for anything at the
// moment. However, if the interface does not exist, there is nothing we can
// do since we do not have the original CNI parameters.
ifaceConfig := ifConfigs[i]
klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName)
}
}

// isInterfaceInvalid returns true if the OVS interface's ofport is "-1" which means the host interface is disconnected.
func (pc *podConfigurator) isInterfaceInvalid(ifaceConfig *interfacestore.InterfaceConfig) bool {
return ifaceConfig.OFPort == -1
}

func (pc *podConfigurator) initPortStatusMonitor(_ cache.SharedIndexInformer) {
Expand Down
11 changes: 10 additions & 1 deletion pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (

const (
podNotReadyTimeInSeconds = 30 * time.Second
ovsInterfaceTypeForPod = "internal"
)

// connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously
Expand All @@ -56,7 +57,7 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I
// before the time, then the library shall merge the event.
pc.unreadyPortQueue.AddAfter(ovsPortName, podNotReadyTimeInSeconds)
return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error {
if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil {
if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, ovsInterfaceTypeForPod); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -127,6 +128,14 @@ func (pc *podConfigurator) configureInterfaces(
containerIFDev, mtu, sriovVFDeviceID, result, containerAccess)
}

// isInterfaceInvalid returns false because we now don't support detecting the disconnected host interface on Windows
// due to the OVS issue (https://github.com/openvswitch/ovs-issues/issues/353), by which we can't differentiate from
// the case that a Pod's host interface is created during agent downtime and is expected to re-connect after agent
// is restarted.
func (pc *podConfigurator) isInterfaceInvalid(ifaceConfig *interfacestore.InterfaceConfig) bool {
return false
}

func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) {
for i := range ifConfigs {
ifaceConfig := ifConfigs[i]
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/cniserver/server_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,16 @@ func TestReconcile(t *testing.T) {
close(podFlowsInstalled)
}).Times(1)
mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1)
mockOFClient.EXPECT().UninstallPodFlows(unconnectedInterface.InterfaceName).Return(nil).Times(1)
mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1)
mockOVSBridgeClient.EXPECT().DeletePort(unconnectedInterface.PortUUID).Return(nil).Times(1)
mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1)
err := cniServer.reconcile()
assert.NoError(t, err)
_, exists := ifaceStore.GetInterfaceByName(staleInterface.InterfaceName)
assert.False(t, exists)
_, exists = ifaceStore.GetInterfaceByName(unconnectedInterface.InterfaceName)
assert.False(t, exists)
select {
case <-podFlowsInstalled:
case <-time.After(500 * time.Millisecond):
Expand Down

0 comments on commit 82a29d6

Please sign in to comment.