diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 776b80ac829..bbba8738472 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -445,10 +445,6 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain var podWg sync.WaitGroup for _, pod := range pods { - // Skip Pods for which we are not in charge of the networking. - if pod.Spec.HostNetwork { - continue - } desiredPods.Insert(k8s.NamespacedName(pod.Namespace, pod.Name)) for _, podIP := range pod.Status.PodIPs { desiredPodIPs.Insert(podIP.IP) diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 253c9ec2065..53b216d2a97 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -29,7 +29,6 @@ import ( "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ip" "google.golang.org/grpc" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -756,14 +755,15 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, // reconcile performs startup reconciliation for the CNI server. The CNI server is in charge of // installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the // K8s apiserver and replay the necessary flows. +// The Pods are processed in reconcile as below, +// | Pod Type | Spec.HostNetwork | windowsOptions.hostProcess | OVS interface needed? | List Pods in reconcile | +// | Normal Pod (non-HostNetwork) | false | false or N/A | Yes | Yes | +// | Linux HostNetwork Pod | true | N/A | No | No | +// | Windows HostNetwork Pod | true | false | Yes | Yes | +// | Windows HostProcess Pod | true | true | No | Yes | func (s *CNIServer) reconcile() error { klog.InfoS("Starting reconciliation for CNI server") - // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from - // the watch cache in kube-apiserver. - pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ - FieldSelector: "spec.nodeName=" + s.nodeConfig.Name, - ResourceVersion: "0", - }) + pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), s.getPodsListOptions()) if err != nil { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } diff --git a/pkg/agent/cniserver/server_linux.go b/pkg/agent/cniserver/server_linux.go index 0e21a557940..9f3f8db76b1 100644 --- a/pkg/agent/cniserver/server_linux.go +++ b/pkg/agent/cniserver/server_linux.go @@ -14,7 +14,12 @@ package cniserver -import current "github.com/containernetworking/cni/pkg/types/100" +import ( + "fmt" + + current "github.com/containernetworking/cni/pkg/types/100" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) // updateResultDNSConfig updates the DNS config from CNIConfig. func updateResultDNSConfig(result *current.Result, cniConfig *CNIConfig) { @@ -48,3 +53,13 @@ func validateRuntime(netNS string) error { func (c *CNIConfig) getInfraContainer() string { return c.ContainerId } + +// getPodsListOptions returns the none host-network Pods running on the current Node. +func (s *CNIServer) getPodsListOptions() metav1.ListOptions { + return metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s,spec.hostNetwork=false", s.nodeConfig.Name), + // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from + // the watch cache in kube-apiserver. + ResourceVersion: "0", + } +} diff --git a/pkg/agent/cniserver/server_windows.go b/pkg/agent/cniserver/server_windows.go index 794c10e1d66..b45b5587ca9 100644 --- a/pkg/agent/cniserver/server_windows.go +++ b/pkg/agent/cniserver/server_windows.go @@ -22,6 +22,7 @@ import ( "strings" current "github.com/containernetworking/cni/pkg/types/100" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" ) @@ -98,3 +99,15 @@ func getInfraContainer(containerID, netNS string) string { func (c *CNIConfig) getInfraContainer() string { return getInfraContainer(c.ContainerId, c.Netns) } + +// getPodsListOptions returns the Pods running on the current Node. Note, the host-network Pods are not filtered +// out on Windows because they are also managed by antrea as long as "spec.SecurityContext.windowsOptions.hostProcess" +// is not configured. +func (s *CNIServer) getPodsListOptions() metav1.ListOptions { + return metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name), + // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from + // the watch cache in kube-apiserver. + ResourceVersion: "0", + } +} diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 636d399ae71..42eb7bc5048 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -46,6 +46,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + utilip "antrea.io/antrea/pkg/util/ip" ) var ( @@ -53,6 +54,22 @@ var ( dnsSearches = []string{"a.b.c.d"} mockWinnet *winnettest.MockInterface + + interfaceForHostNetworkPod = &interfacestore.InterfaceConfig{ + InterfaceName: "iface2", + Type: interfacestore.ContainerInterface, + IPs: []net.IP{net.ParseIP("1.1.1.2")}, + MAC: utilip.MustParseMAC("00:11:22:33:44:02"), + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(4), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: pod2.Name, + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } ) func TestUpdateResultDNSConfig(t *testing.T) { @@ -732,7 +749,7 @@ func TestReconcile(t *testing.T) { cniServer := newCNIServer(t) cniServer.routeClient = mockRoute cniServer.kubeClient = kubeClient - for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { + for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface, interfaceForHostNetworkPod} { ifaceStore.AddInterface(containerIface) } waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh) @@ -741,11 +758,19 @@ func TestReconcile(t *testing.T) { go cniServer.podConfigurator.Run(stopCh) // Re-install Pod1 flows - podFlowsInstalled := make(chan string, 2) + expReinstalledPodCount := 3 + podFlowsInstalled := make(chan string, expReinstalledPodCount) mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil). Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { podFlowsInstalled <- interfaceName }).Times(1) + + // Re-install host-network Pod (Pod2) flows + mockOFClient.EXPECT().InstallPodFlows(interfaceForHostNetworkPod.InterfaceName, interfaceForHostNetworkPod.IPs, interfaceForHostNetworkPod.MAC, uint32(interfaceForHostNetworkPod.OFPort), uint16(0), nil). + Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + podFlowsInstalled <- interfaceName + }).Times(1) + // Uninstall Pod3 flows which is deleted. mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1) mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1) @@ -778,7 +803,7 @@ func TestReconcile(t *testing.T) { assert.NoError(t, err) _, exists := ifaceStore.GetInterfaceByName("iface3") assert.False(t, exists) - for i := 0; i < 2; i++ { + for i := 0; i < expReinstalledPodCount; i++ { select { case <-podFlowsInstalled: case <-time.After(500 * time.Millisecond): diff --git a/test/e2e/connectivity_test.go b/test/e2e/connectivity_test.go index 6c6c33c0ff3..6ede5dc966b 100644 --- a/test/e2e/connectivity_test.go +++ b/test/e2e/connectivity_test.go @@ -70,6 +70,10 @@ func TestConnectivity(t *testing.T) { skipIfNumNodesLessThan(t, 2) testPingLargeMTU(t, data) }) + t.Run("testWindowsPodConnectivityAfterAntreaRestart", func(t *testing.T) { + skipIfNoWindowsNodes(t) + testWindowsPodConnectivityAfterAntreaRestart(t, data) + }) } func waitForPodIPs(t *testing.T, data *TestData, podInfos []PodInfo) map[string]*PodIPs { @@ -121,6 +125,100 @@ func (data *TestData) runPingMesh(t *testing.T, podInfos []PodInfo, ctrname stri } } +// verifyWindowsPodConnectivity checks Pod connectivity after antrea-agent is restarted on Windows. +// We test both the generic Pod case and the host-network Pod case, because CNI on Windows is also +// responsible for the host-network Pod's networking as long as it is not using host-process containers. +func testWindowsPodConnectivityAfterAntreaRestart(t *testing.T, data *TestData) { + linuxWorkerNode := clusterInfo.controlPlaneNodeName + linuxPodName := randName("test-pod-") + clientPod := PodInfo{ + Name: linuxPodName, + Namespace: data.testNamespace, + NodeName: linuxWorkerNode, + OS: "linux", + } + + t.Logf("Creating Linux Pod %s on Node '%s'", linuxPodName, linuxWorkerNode) + if err := data.createToolboxPodOnNode(clientPod.Name, clientPod.Namespace, clientPod.NodeName, false); err != nil { + t.Fatalf("Error when creating Pod '%s': %v", clientPod.Name, err) + } + defer deletePodWrapper(t, data, clientPod.Namespace, clientPod.Name) + + t.Run("testGenericPodConnectivity", func(t *testing.T) { + data.verifyWindowsPodConnectivity(t, clientPod, false) + }) + t.Run("testHostNetworkPodConnectivity", func(t *testing.T) { + data.verifyWindowsPodConnectivity(t, clientPod, true) + }) +} + +func (data *TestData) dumpOVSFlows(t *testing.T, workerNode string) []string { + ovsOfctlCmd := "ovs-ofctl" + if clusterInfo.nodesOS[workerNode] == "windows" { + ovsOfctlCmd = `c:/openvswitch/usr/bin/ovs-ofctl.exe` + } + cmd := []string{ovsOfctlCmd, "dump-flows", defaultBridgeName, "--names"} + antreaPodName, err := data.getAntreaPodOnNode(workerNode) + if err != nil { + t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", workerNode, err) + } + stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) + if err != nil { + t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err) + } + flows := make([]string, 0) + for _, flow := range strings.Split(stdout, "\n") { + flow = strings.TrimSpace(flow) + if flow == "" { + continue + } + flows = append(flows, flow) + } + t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", len(flows), defaultBridgeName, workerNode) + return flows +} + +func (data *TestData) verifyWindowsPodConnectivity(t *testing.T, clientPod PodInfo, useHostNetwork bool) { + winPodName := randName("test-pod-") + winWorkerNode := workerNodeName(clusterInfo.windowsNodes[0]) + winPod := PodInfo{ + Name: winPodName, + Namespace: data.testNamespace, + NodeName: winWorkerNode, + OS: "windows", + } + t.Logf("Creating Windows Pod %s on Node '%s'", winPodName, winWorkerNode) + if err := data.createToolboxPodOnNode(winPod.Name, winPod.Namespace, winPod.NodeName, useHostNetwork); err != nil { + t.Fatalf("Error when creating Pod '%s': %v", winPodName, err) + } + defer deletePodWrapper(t, data, winPod.Namespace, winPod.Name) + + testPodInfos := []PodInfo{clientPod, winPod} + + // Verify Pod connectivity before agent restart + data.runPingMesh(t, testPodInfos, toolboxContainerName, true) + + // Count the OVS flows. + initialOVSFlows := data.dumpOVSFlows(t, winWorkerNode) + + // Restart Antrea agent Pods + err := data.RestartAntreaAgentPods(defaultTimeout) + assert.NoError(t, err) + + // Wait until Agent completes reconcile and OpenFlows replay. + err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + newOVSFlows := data.dumpOVSFlows(t, winWorkerNode) + if len(newOVSFlows) != len(initialOVSFlows) { + return false, nil + } + return true, nil + }) + assert.NoErrorf(t, err, "The Openflow entries should be consistent after Antrea agent restarts on Windows Node %s", winWorkerNode) + + // Verify Pod connectivity after agent restart + data.runPingMesh(t, testPodInfos, toolboxContainerName, true) +} + func (data *TestData) testPodConnectivitySameNode(t *testing.T) { numPods := 2 // can be increased podInfos := make([]PodInfo, numPods) @@ -411,24 +509,6 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) { } t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName) - dumpFlows := func() []string { - cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName, "--names"} - stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) - if err != nil { - t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err) - } - flows := make([]string, 0) - for _, flow := range strings.Split(stdout, "\n") { - flow = strings.TrimSpace(flow) - if flow == "" { - continue - } - flows = append(flows, flow) - } - count := len(flows) - t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode) - return flows - } dumpGroups := func() []string { cmd := []string{"ovs-ofctl", "dump-groups", defaultBridgeName} stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) @@ -449,7 +529,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) { return groups } - flows1, groups1 := dumpFlows(), dumpGroups() + flows1, groups1 := data.dumpOVSFlows(t, workerNode), dumpGroups() numFlows1, numGroups1 := len(flows1), len(groups1) // This is necessary because "ovs-ctl restart" saves and restores OpenFlow flows for the @@ -486,7 +566,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) { t.Logf("Running second ping mesh to check that flows have been restored") data.runPingMesh(t, podInfos, toolboxContainerName, true) - flows2, groups2 := dumpFlows(), dumpGroups() + flows2, groups2 := data.dumpOVSFlows(t, workerNode), dumpGroups() numFlows2, numGroups2 := len(flows2), len(groups2) if !assert.Equal(t, numFlows1, numFlows2, "Mismatch in OVS flow count after flow replay") { fmt.Println("Flows before replay:")