Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bind to pod ips when env variable ENABLE_BIND_LOCAL_IP is set to true #5049

Merged
merged 5 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/http/pprof"
"os"
"slices"
"time"

v1 "k8s.io/api/authorization/v1"
Expand Down Expand Up @@ -53,8 +54,8 @@ func CmdMain() {
ctrl.SetLogger(klog.NewKlogr())
ctx := signals.SetupSignalHandler()
go func() {
metricsAddr := util.GetDefaultListenAddr()
servePprofInMetricsServer := config.EnableMetrics && metricsAddr == "0.0.0.0"
metricsAddrs := util.GetDefaultListenAddr()
servePprofInMetricsServer := config.EnableMetrics && slices.Contains(metricsAddrs, "0.0.0.0")
if config.EnablePprof && !servePprofInMetricsServer {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
Expand Down Expand Up @@ -87,15 +88,19 @@ func CmdMain() {
if config.EnableMetrics {
metrics.InitKlogMetrics()
metrics.InitClientGoMetrics()
addr := util.JoinHostPort(metricsAddr, config.PprofPort)
if err := metrics.Run(ctx, config.KubeRestConfig, addr, config.SecureServing, servePprofInMetricsServer); err != nil {
util.LogFatalAndExit(err, "failed to run metrics server")
for _, metricsAddr := range metricsAddrs {
addr := util.JoinHostPort(metricsAddr, config.PprofPort)
go func() {
if err := metrics.Run(ctx, config.KubeRestConfig, addr, config.SecureServing, servePprofInMetricsServer); err != nil {
util.LogFatalAndExit(err, "failed to run metrics server")
}
}()
}
} else {
klog.Info("metrics server is disabled")
listerner, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(metricsAddr), Port: int(config.PprofPort)})
listerner, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(metricsAddrs[0]), Port: int(config.PprofPort)})
if err != nil {
util.LogFatalAndExit(err, "failed to listen on %s", util.JoinHostPort(metricsAddr, config.PprofPort))
util.LogFatalAndExit(err, "failed to listen on %s", util.JoinHostPort(metricsAddrs[0], config.PprofPort))
}
mux := http.NewServeMux()
mux.HandleFunc("/healthz", util.DefaultHealthCheckHandler)
Expand Down
46 changes: 26 additions & 20 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/pprof"
"os"
"path/filepath"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -99,24 +100,25 @@ func main() {
go ctl.Run(stopCh)
go daemon.RunServer(config, ctl)

addr := util.GetDefaultListenAddr()
addrs := util.GetDefaultListenAddr()
if config.EnableVerboseConnCheck {
go func() {
connListenaddr := util.JoinHostPort(addr, config.TCPConnCheckPort)
if err := util.TCPConnectivityListen(connListenaddr); err != nil {
util.LogFatalAndExit(err, "failed to start TCP listen on addr %s", addr)
}
}()

go func() {
connListenaddr := util.JoinHostPort(addr, config.UDPConnCheckPort)
if err := util.UDPConnectivityListen(connListenaddr); err != nil {
util.LogFatalAndExit(err, "failed to start UDP listen on addr %s", addr)
}
}()
for _, addr := range addrs {
go func() {
connListenaddr := util.JoinHostPort(addr, config.TCPConnCheckPort)
if err := util.TCPConnectivityListen(connListenaddr); err != nil {
util.LogFatalAndExit(err, "failed to start TCP listen on addr %s", addr)
}
}()
go func() {
connListenaddr := util.JoinHostPort(addr, config.UDPConnCheckPort)
if err := util.UDPConnectivityListen(connListenaddr); err != nil {
util.LogFatalAndExit(err, "failed to start UDP listen on addr %s", addr)
}
}()
}
}

servePprofInMetricsServer := config.EnableMetrics && addr == "0.0.0.0"
servePprofInMetricsServer := config.EnableMetrics && slices.Contains(addrs, "0.0.0.0")
if config.EnablePprof && !servePprofInMetricsServer {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
Expand Down Expand Up @@ -149,15 +151,19 @@ func main() {
if config.EnableMetrics {
daemon.InitMetrics()
metrics.InitKlogMetrics()
listenAddr := util.JoinHostPort(addr, config.PprofPort)
if err = metrics.Run(ctx, nil, listenAddr, config.SecureServing, servePprofInMetricsServer); err != nil {
util.LogFatalAndExit(err, "failed to run metrics server")
for _, addr := range addrs {
listenAddr := util.JoinHostPort(addr, config.PprofPort)
go func() {
if err := metrics.Run(ctx, nil, listenAddr, config.SecureServing, servePprofInMetricsServer); err != nil {
util.LogFatalAndExit(err, "failed to run metrics server")
}
}()
}
} else {
klog.Info("metrics server is disabled")
listerner, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(addr), Port: int(config.PprofPort)})
listerner, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(addrs[0]), Port: int(config.PprofPort)})
if err != nil {
util.LogFatalAndExit(err, "failed to listen on %s", util.JoinHostPort(addr, config.PprofPort))
util.LogFatalAndExit(err, "failed to listen on %s", util.JoinHostPort(addrs[0], config.PprofPort))
}
mux := http.NewServeMux()
mux.HandleFunc("/healthz", util.DefaultHealthCheckHandler)
Expand Down
16 changes: 10 additions & 6 deletions cmd/ovn_monitor/ovn_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,27 @@ func CmdMain() {
ctrl.SetLogger(klog.NewKlogr())
ctx := signals.SetupSignalHandler()

metricsAddr := util.GetDefaultListenAddr()
metricsAddrs := util.GetDefaultListenAddr()
if config.EnableMetrics {
exporter := ovn.NewExporter(config)
if err = exporter.StartConnection(); err != nil {
klog.Errorf("%s failed to connect db socket properly: %s", ovn.GetExporterName(), err)
go exporter.TryClientConnection()
}
exporter.StartOvnMetrics()
addr := util.JoinHostPort(metricsAddr, config.MetricsPort)
if err = metrics.Run(ctx, nil, addr, config.SecureServing, false); err != nil {
util.LogFatalAndExit(err, "failed to run metrics server")
for _, metricsAddr := range metricsAddrs {
addr := util.JoinHostPort(metricsAddr, config.MetricsPort)
go func() {
if err := metrics.Run(ctx, nil, addr, config.SecureServing, false); err != nil {
util.LogFatalAndExit(err, "failed to run metrics server")
}
}()
}
} else {
klog.Info("metrics server is disabled")
listerner, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(util.GetDefaultListenAddr()), Port: int(config.MetricsPort)})
listerner, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(metricsAddrs[0]), Port: int(config.MetricsPort)})
if err != nil {
util.LogFatalAndExit(err, "failed to listen on %s", util.JoinHostPort(metricsAddr, config.MetricsPort))
util.LogFatalAndExit(err, "failed to listen on %s", util.JoinHostPort(metricsAddrs[0], config.MetricsPort))
}
mux := http.NewServeMux()
mux.HandleFunc("/healthz", util.DefaultHealthCheckHandler)
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Configuration struct {
KubeClient kubernetes.Interface
KubeOvnClient clientset.Interface
NodeName string
NodeIPv4 string
NodeIPv6 string
ServiceClusterIPRange string
ClusterRouter string
NodeSwitch string
Expand Down Expand Up @@ -213,6 +215,7 @@ func (config *Configuration) initNicConfig(nicBridgeMappings map[string]string)
klog.Errorf("Failed to find node info, err: %v", err)
return err
}
config.NodeIPv4, config.NodeIPv6 = util.GetNodeInternalIP(*node)
if nodeTunnelName := node.GetAnnotations()[util.TunnelInterfaceAnnotation]; nodeTunnelName != "" {
config.Iface = nodeTunnelName
klog.Infof("Find node tunnel interface name: %v", nodeTunnelName)
Expand Down
23 changes: 7 additions & 16 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,14 +642,11 @@ func (c *Controller) setIptables() error {
{Table: MANGLE, Chain: OvnPostrouting, Rule: strings.Fields(`-p tcp -m set --match-set ovn60subnets src -m tcp --tcp-flags RST RST -m state --state INVALID -j DROP`)},
}
)
protocols := make([]string, 2)
isDual := false
protocols := make([]string, 0, 2)
if c.protocol == kubeovnv1.ProtocolDual {
protocols[0] = kubeovnv1.ProtocolIPv4
protocols[1] = kubeovnv1.ProtocolIPv6
isDual = true
protocols = append(protocols, kubeovnv1.ProtocolIPv4, kubeovnv1.ProtocolIPv6)
} else {
protocols[0] = c.protocol
protocols = append(protocols, c.protocol)
}

for _, protocol := range protocols {
Expand Down Expand Up @@ -837,7 +834,7 @@ func (c *Controller) setIptables() error {
return err
}

if err = c.reconcileTProxyIPTableRules(protocol, isDual); err != nil {
if err = c.reconcileTProxyIPTableRules(protocol); err != nil {
klog.Error(err)
return err
}
Expand Down Expand Up @@ -868,7 +865,7 @@ func (c *Controller) setIptables() error {
return nil
}

func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) error {
func (c *Controller) reconcileTProxyIPTableRules(protocol string) error {
if !c.config.EnableTProxy {
return nil
}
Expand Down Expand Up @@ -902,19 +899,13 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e
}

for _, probePort := range ports.SortedList() {
hostIP := pod.Status.HostIP
hostIP := c.config.NodeIPv4
prefixLen := 32
if protocol == kubeovnv1.ProtocolIPv6 {
prefixLen = 128
hostIP = c.config.NodeIPv6
}

if isDual || os.Getenv("ENABLE_BIND_LOCAL_IP") == "false" {
if protocol == kubeovnv1.ProtocolIPv4 {
hostIP = "0.0.0.0"
} else if protocol == kubeovnv1.ProtocolIPv6 {
hostIP = "::"
}
}
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
Expand Down
49 changes: 26 additions & 23 deletions pkg/daemon/tproxy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,34 @@ var (
)

func (c *Controller) StartTProxyForwarding() {
protocol := "tcp"
addr := util.GetDefaultListenAddr()
if util.CheckProtocol(addr) == kubeovnv1.ProtocolIPv6 {
protocol = "tcp6"
}
for _, addr := range util.GetDefaultListenAddr() {
protocol := "tcp"
if util.CheckProtocol(addr) == kubeovnv1.ProtocolIPv6 {
protocol = "tcp6"
}

tcpListener, err := goTProxy.ListenTCP(protocol, &net.TCPAddr{IP: net.ParseIP(addr), Port: util.TProxyListenPort})
if err != nil {
klog.Fatalf("Encountered error while binding listener: %s", err)
return
}
go func() {
tcpListener, err := goTProxy.ListenTCP(protocol, &net.TCPAddr{IP: net.ParseIP(addr), Port: util.TProxyListenPort})
if err != nil {
klog.Fatalf("Encountered error while binding listener: %s", err)
return
}

defer func() {
if err := tcpListener.Close(); err != nil {
klog.Errorf("Error tcpListener Close err: %v", err)
}
}()
defer func() {
if err := tcpListener.Close(); err != nil {
klog.Errorf("Error tcpListener Close err: %v", err)
}
}()

for {
conn, err := tcpListener.Accept()
if err != nil {
klog.Fatalf("Unrecoverable error while accepting connection: %s", err)
return
}
go handleRedirectFlow(conn)
for {
conn, err := tcpListener.Accept()
if err != nil {
klog.Fatalf("Unrecoverable error while accepting connection: %s", err)
return
}
go handleRedirectFlow(conn)
}
}()
}
}

Expand Down Expand Up @@ -90,7 +93,7 @@ func getProbePorts(pod *corev1.Pod) set.Set[int32] {
}

ports.Delete(0)
klog.Infof("probe ports for pod %s/%s: %v", pod.Namespace, pod.Name, ports.SortedList())
klog.V(3).Infof("probe ports for pod %s/%s: %v", pod.Namespace, pod.Name, ports.SortedList())
return ports
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,13 +692,15 @@ func UDPConnectivityListen(endpoint string) error {
return nil
}

func GetDefaultListenAddr() string {
func GetDefaultListenAddr() []string {
if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" {
if ips := strings.Split(os.Getenv("POD_IPS"), ","); len(ips) == 1 {
return ips[0]
if podIPs := os.Getenv("POD_IPS"); podIPs != "" {
return strings.Split(podIPs, ",")
}
klog.Error("environment variable POD_IPS is not set, cannot bind to local ip")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
return "0.0.0.0"
return []string{"0.0.0.0"}
}

func ContainsUppercase(s string) bool {
Expand Down
15 changes: 11 additions & 4 deletions pkg/util/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,14 +1390,21 @@ func TestUDPConnectivityCheck(t *testing.T) {
}

func TestGetDefaultListenAddr(t *testing.T) {
addr := GetDefaultListenAddr()
require.Equal(t, addr, "0.0.0.0")
require.Equal(t, GetDefaultListenAddr(), []string{"0.0.0.0"})
err := os.Setenv("ENABLE_BIND_LOCAL_IP", "true")
require.NoError(t, err)
err = os.Setenv("POD_IPS", "10.10.10.10")
require.NoError(t, err)
addr = GetDefaultListenAddr()
require.Equal(t, addr, "10.10.10.10")
require.Equal(t, GetDefaultListenAddr(), []string{"10.10.10.10"})
err = os.Setenv("POD_IPS", "fd00::1")
require.NoError(t, err)
require.Equal(t, GetDefaultListenAddr(), []string{"fd00::1"})
err = os.Setenv("POD_IPS", "10.10.10.10,fd00::1")
require.NoError(t, err)
require.Equal(t, GetDefaultListenAddr(), []string{"10.10.10.10", "fd00::1"})
err = os.Setenv("ENABLE_BIND_LOCAL_IP", "false")
require.NoError(t, err)
require.Equal(t, GetDefaultListenAddr(), []string{"0.0.0.0"})
}

func TestContainsUppercase(t *testing.T) {
Expand Down
23 changes: 10 additions & 13 deletions test/e2e/kube-ovn/pod/vpc_pod_probe.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package pod

import (
"context"
"fmt"
"math/rand/v2"
"net"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -185,9 +187,12 @@ func checkTProxyRules(f *framework.Framework, pod *corev1.Pod, probePort int32,
ginkgo.GinkgoHelper()

nodeName := pod.Spec.NodeName
isZeroIP := false
if len(pod.Status.PodIPs) == 2 {
isZeroIP = true
node, err := f.ClientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
framework.ExpectNoError(err)

nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
if len(pod.Status.PodIPs) == 2 && f.VersionPriorTo(1, 13) {
nodeIPv4, nodeIPv6 = net.IPv4zero.String(), net.IPv6zero.String()
}

for _, podIP := range pod.Status.PodIPs {
Expand All @@ -196,12 +201,8 @@ func checkTProxyRules(f *framework.Framework, pod *corev1.Pod, probePort int32,
fmt.Sprintf(`-A OVN-OUTPUT -d %s/32 -p tcp -m tcp --dport %d -j MARK --set-xmark %s`, podIP.IP, probePort, tProxyOutputMarkMask),
}
iptables.CheckIptablesRulesOnNode(f, nodeName, util.Mangle, util.OvnOutput, apiv1.ProtocolIPv4, expectedRules, exist)
hostIP := pod.Status.HostIP
if isZeroIP {
hostIP = net.IPv4zero.String()
}
expectedRules = []string{
fmt.Sprintf(`-A OVN-PREROUTING -d %s/32 -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP.IP, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask),
fmt.Sprintf(`-A OVN-PREROUTING -d %s/32 -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP.IP, probePort, util.TProxyListenPort, nodeIPv4, tProxyPreRoutingMarkMask),
}
iptables.CheckIptablesRulesOnNode(f, nodeName, util.Mangle, util.OvnPrerouting, apiv1.ProtocolIPv4, expectedRules, exist)
} else if util.CheckProtocol(podIP.IP) == apiv1.ProtocolIPv6 {
Expand All @@ -210,12 +211,8 @@ func checkTProxyRules(f *framework.Framework, pod *corev1.Pod, probePort int32,
}
iptables.CheckIptablesRulesOnNode(f, nodeName, util.Mangle, util.OvnOutput, apiv1.ProtocolIPv6, expectedRules, exist)

hostIP := pod.Status.HostIP
if isZeroIP {
hostIP = "::"
}
expectedRules = []string{
fmt.Sprintf(`-A OVN-PREROUTING -d %s/128 -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP.IP, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask),
fmt.Sprintf(`-A OVN-PREROUTING -d %s/128 -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP.IP, probePort, util.TProxyListenPort, nodeIPv6, tProxyPreRoutingMarkMask),
}
iptables.CheckIptablesRulesOnNode(f, nodeName, util.Mangle, util.OvnPrerouting, apiv1.ProtocolIPv6, expectedRules, exist)
}
Expand Down
Loading
Loading