Skip to content

Commit

Permalink
[Windows] Reconcile host-network Pods after agent is restarted (#6944)
Browse files Browse the repository at this point in the history
This change is to support reconciling the host-network Pods on Windows because
k8s expects to let CNI manage such Pods as long as they are not using
host-process containers.

Antrea received the CmdAdd request for such Pods when they were created, so
they should be included in the Pod reconcile list after agent is restarted.

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd authored Feb 3, 2025
1 parent 9a76513 commit 71c4290
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 35 deletions.
4 changes: 0 additions & 4 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,6 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
var podWg sync.WaitGroup

for _, pod := range pods {
// Skip Pods for which we are not in charge of the networking.
if pod.Spec.HostNetwork {
continue
}
desiredPods.Insert(k8s.NamespacedName(pod.Namespace, pod.Name))
for _, podIP := range pod.Status.PodIPs {
desiredPodIPs.Insert(podIP.IP)
Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ip"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -756,14 +755,15 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// reconcile performs startup reconciliation for the CNI server. The CNI server is in charge of
// installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the
// K8s apiserver and replay the necessary flows.
// The Pods are processed in reconcile as below,
// | Pod Type | Spec.HostNetwork | windowsOptions.hostProcess | OVS interface needed? | List Pods in reconcile |
// | Normal Pod (non-HostNetwork) | false | false or N/A | Yes | Yes |
// | Linux HostNetwork Pod | true | N/A | No | No |
// | Windows HostNetwork Pod | true | false | Yes | Yes |
// | Windows HostProcess Pod | true | true | No | Yes |
func (s *CNIServer) reconcile() error {
klog.InfoS("Starting reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + s.nodeConfig.Name,
ResourceVersion: "0",
})
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), s.getPodsListOptions())
if err != nil {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/cniserver/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

package cniserver

import current "github.com/containernetworking/cni/pkg/types/100"
import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// updateResultDNSConfig updates the DNS config from CNIConfig.
func updateResultDNSConfig(result *current.Result, cniConfig *CNIConfig) {
Expand Down Expand Up @@ -48,3 +53,13 @@ func validateRuntime(netNS string) error {
func (c *CNIConfig) getInfraContainer() string {
return c.ContainerId
}

// getPodsListOptions returns the none host-network Pods running on the current Node.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s,spec.hostNetwork=false", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
13 changes: 13 additions & 0 deletions pkg/agent/cniserver/server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -98,3 +99,15 @@ func getInfraContainer(containerID, netNS string) string {
func (c *CNIConfig) getInfraContainer() string {
return getInfraContainer(c.ContainerId, c.Netns)
}

// getPodsListOptions returns the Pods running on the current Node. Note, the host-network Pods are not filtered
// out on Windows because they are also managed by antrea as long as "spec.SecurityContext.windowsOptions.hostProcess"
// is not configured.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
31 changes: 28 additions & 3 deletions pkg/agent/cniserver/server_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,30 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
)

var (
containerMACStr = "23:34:56:23:22:45"
dnsSearches = []string{"a.b.c.d"}

mockWinnet *winnettest.MockInterface

interfaceForHostNetworkPod = &interfacestore.InterfaceConfig{
InterfaceName: "iface2",
Type: interfacestore.ContainerInterface,
IPs: []net.IP{net.ParseIP("1.1.1.2")},
MAC: utilip.MustParseMAC("00:11:22:33:44:02"),
OVSPortConfig: &interfacestore.OVSPortConfig{
PortUUID: generateUUID(),
OFPort: int32(4),
},
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{
PodName: pod2.Name,
PodNamespace: testPodNamespace,
ContainerID: generateUUID(),
},
}
)

func TestUpdateResultDNSConfig(t *testing.T) {
Expand Down Expand Up @@ -732,7 +749,7 @@ func TestReconcile(t *testing.T) {
cniServer := newCNIServer(t)
cniServer.routeClient = mockRoute
cniServer.kubeClient = kubeClient
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} {
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface, interfaceForHostNetworkPod} {
ifaceStore.AddInterface(containerIface)
}
waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh)
Expand All @@ -741,11 +758,19 @@ func TestReconcile(t *testing.T) {
go cniServer.podConfigurator.Run(stopCh)

// Re-install Pod1 flows
podFlowsInstalled := make(chan string, 2)
expReinstalledPodCount := 3
podFlowsInstalled := make(chan string, expReinstalledPodCount)
mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Re-install host-network Pod (Pod2) flows
mockOFClient.EXPECT().InstallPodFlows(interfaceForHostNetworkPod.InterfaceName, interfaceForHostNetworkPod.IPs, interfaceForHostNetworkPod.MAC, uint32(interfaceForHostNetworkPod.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Uninstall Pod3 flows which is deleted.
mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1)
mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1)
Expand Down Expand Up @@ -778,7 +803,7 @@ func TestReconcile(t *testing.T) {
assert.NoError(t, err)
_, exists := ifaceStore.GetInterfaceByName("iface3")
assert.False(t, exists)
for i := 0; i < 2; i++ {
for i := 0; i < expReinstalledPodCount; i++ {
select {
case <-podFlowsInstalled:
case <-time.After(500 * time.Millisecond):
Expand Down
120 changes: 100 additions & 20 deletions test/e2e/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func TestConnectivity(t *testing.T) {
skipIfNumNodesLessThan(t, 2)
testPingLargeMTU(t, data)
})
t.Run("testWindowsPodConnectivityAfterAntreaRestart", func(t *testing.T) {
skipIfNoWindowsNodes(t)
testWindowsPodConnectivityAfterAntreaRestart(t, data)
})
}

func waitForPodIPs(t *testing.T, data *TestData, podInfos []PodInfo) map[string]*PodIPs {
Expand Down Expand Up @@ -121,6 +125,100 @@ func (data *TestData) runPingMesh(t *testing.T, podInfos []PodInfo, ctrname stri
}
}

// verifyWindowsPodConnectivity checks Pod connectivity after antrea-agent is restarted on Windows.
// We test both the generic Pod case and the host-network Pod case, because CNI on Windows is also
// responsible for the host-network Pod's networking as long as it is not using host-process containers.
func testWindowsPodConnectivityAfterAntreaRestart(t *testing.T, data *TestData) {
linuxWorkerNode := clusterInfo.controlPlaneNodeName
linuxPodName := randName("test-pod-")
clientPod := PodInfo{
Name: linuxPodName,
Namespace: data.testNamespace,
NodeName: linuxWorkerNode,
OS: "linux",
}

t.Logf("Creating Linux Pod %s on Node '%s'", linuxPodName, linuxWorkerNode)
if err := data.createToolboxPodOnNode(clientPod.Name, clientPod.Namespace, clientPod.NodeName, false); err != nil {
t.Fatalf("Error when creating Pod '%s': %v", clientPod.Name, err)
}
defer deletePodWrapper(t, data, clientPod.Namespace, clientPod.Name)

t.Run("testGenericPodConnectivity", func(t *testing.T) {
data.verifyWindowsPodConnectivity(t, clientPod, false)
})
t.Run("testHostNetworkPodConnectivity", func(t *testing.T) {
data.verifyWindowsPodConnectivity(t, clientPod, true)
})
}

func (data *TestData) dumpOVSFlows(t *testing.T, workerNode string) []string {
ovsOfctlCmd := "ovs-ofctl"
if clusterInfo.nodesOS[workerNode] == "windows" {
ovsOfctlCmd = `c:/openvswitch/usr/bin/ovs-ofctl.exe`
}
cmd := []string{ovsOfctlCmd, "dump-flows", defaultBridgeName, "--names"}
antreaPodName, err := data.getAntreaPodOnNode(workerNode)
if err != nil {
t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", workerNode, err)
}
stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err)
}
flows := make([]string, 0)
for _, flow := range strings.Split(stdout, "\n") {
flow = strings.TrimSpace(flow)
if flow == "" {
continue
}
flows = append(flows, flow)
}
t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", len(flows), defaultBridgeName, workerNode)
return flows
}

func (data *TestData) verifyWindowsPodConnectivity(t *testing.T, clientPod PodInfo, useHostNetwork bool) {
winPodName := randName("test-pod-")
winWorkerNode := workerNodeName(clusterInfo.windowsNodes[0])
winPod := PodInfo{
Name: winPodName,
Namespace: data.testNamespace,
NodeName: winWorkerNode,
OS: "windows",
}
t.Logf("Creating Windows Pod %s on Node '%s'", winPodName, winWorkerNode)
if err := data.createToolboxPodOnNode(winPod.Name, winPod.Namespace, winPod.NodeName, useHostNetwork); err != nil {
t.Fatalf("Error when creating Pod '%s': %v", winPodName, err)
}
defer deletePodWrapper(t, data, winPod.Namespace, winPod.Name)

testPodInfos := []PodInfo{clientPod, winPod}

// Verify Pod connectivity before agent restart
data.runPingMesh(t, testPodInfos, toolboxContainerName, true)

// Count the OVS flows.
initialOVSFlows := data.dumpOVSFlows(t, winWorkerNode)

// Restart Antrea agent Pods
err := data.RestartAntreaAgentPods(defaultTimeout)
assert.NoError(t, err)

// Wait until Agent completes reconcile and OpenFlows replay.
err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) {
newOVSFlows := data.dumpOVSFlows(t, winWorkerNode)
if len(newOVSFlows) != len(initialOVSFlows) {
return false, nil
}
return true, nil
})
assert.NoErrorf(t, err, "The Openflow entries should be consistent after Antrea agent restarts on Windows Node %s", winWorkerNode)

// Verify Pod connectivity after agent restart
data.runPingMesh(t, testPodInfos, toolboxContainerName, true)
}

func (data *TestData) testPodConnectivitySameNode(t *testing.T) {
numPods := 2 // can be increased
podInfos := make([]PodInfo, numPods)
Expand Down Expand Up @@ -411,24 +509,6 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) {
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName)

dumpFlows := func() []string {
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName, "--names"}
stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err)
}
flows := make([]string, 0)
for _, flow := range strings.Split(stdout, "\n") {
flow = strings.TrimSpace(flow)
if flow == "" {
continue
}
flows = append(flows, flow)
}
count := len(flows)
t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return flows
}
dumpGroups := func() []string {
cmd := []string{"ovs-ofctl", "dump-groups", defaultBridgeName}
stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
Expand All @@ -449,7 +529,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) {
return groups
}

flows1, groups1 := dumpFlows(), dumpGroups()
flows1, groups1 := data.dumpOVSFlows(t, workerNode), dumpGroups()
numFlows1, numGroups1 := len(flows1), len(groups1)

// This is necessary because "ovs-ctl restart" saves and restores OpenFlow flows for the
Expand Down Expand Up @@ -486,7 +566,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) {
t.Logf("Running second ping mesh to check that flows have been restored")
data.runPingMesh(t, podInfos, toolboxContainerName, true)

flows2, groups2 := dumpFlows(), dumpGroups()
flows2, groups2 := data.dumpOVSFlows(t, workerNode), dumpGroups()
numFlows2, numGroups2 := len(flows2), len(groups2)
if !assert.Equal(t, numFlows1, numFlows2, "Mismatch in OVS flow count after flow replay") {
fmt.Println("Flows before replay:")
Expand Down

0 comments on commit 71c4290

Please sign in to comment.