Skip to content

Commit

Permalink
feat: support weight 0 serviceExport (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiying-lin authored Feb 19, 2025
1 parent de7b3f0 commit 1db34bd
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/controllers/hub/trafficmanagerbackend/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ func isValidTrafficManagerEndpoint(export *fleetnetv1alpha1.InternalServiceExpor
if export.Spec.IsInternalLoadBalancer {
return fmt.Errorf("internal load balancer is not supported")
}
if export.Spec.PublicIPResourceID == nil {
return fmt.Errorf("in the processing of configuring public IP")
}
if !export.Spec.IsDNSLabelConfigured {
return fmt.Errorf("DNS label is not configured to the public IP")
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/hub/trafficmanagerbackend/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestIsValidTrafficManagerEndpoint(t *testing.T) {
export: &fleetnetv1alpha1.InternalServiceExport{
Spec: fleetnetv1alpha1.InternalServiceExportSpec{
Type: corev1.ServiceTypeLoadBalancer,
PublicIPResourceID: ptr.To("abc"),
IsDNSLabelConfigured: true,
IsInternalLoadBalancer: false,
},
Expand Down Expand Up @@ -56,6 +57,17 @@ func TestIsValidTrafficManagerEndpoint(t *testing.T) {
},
{
name: "load balancer type with public ip but dns label not configured",
export: &fleetnetv1alpha1.InternalServiceExport{
Spec: fleetnetv1alpha1.InternalServiceExportSpec{
Type: corev1.ServiceTypeLoadBalancer,
PublicIPResourceID: ptr.To("abc"),
IsDNSLabelConfigured: false,
},
},
wantErr: true,
},
{
name: "load balancer type with public ip but public ip is not ready",
export: &fleetnetv1alpha1.InternalServiceExport{
Spec: fleetnetv1alpha1.InternalServiceExportSpec{
Type: corev1.ServiceTypeLoadBalancer,
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/hub/trafficmanagerbackend/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
Expand Down Expand Up @@ -71,6 +72,7 @@ var (
NamespacedName: fmt.Sprintf("%s/%s", testNamespace, serviceName),
},
Type: corev1.ServiceTypeLoadBalancer,
PublicIPResourceID: ptr.To("abc"),
IsDNSLabelConfigured: true,
},
},
Expand Down Expand Up @@ -152,6 +154,7 @@ var (
NamespacedName: fmt.Sprintf("%s/%s", testNamespace, serviceName),
},
Type: corev1.ServiceTypeLoadBalancer,
PublicIPResourceID: ptr.To("abc"),
IsDNSLabelConfigured: true,
},
},
Expand Down Expand Up @@ -180,6 +183,7 @@ var (
NamespacedName: fmt.Sprintf("%s/%s", testNamespace, serviceName),
},
Type: corev1.ServiceTypeLoadBalancer,
PublicIPResourceID: ptr.To("abc"),
IsDNSLabelConfigured: true,
},
},
Expand Down Expand Up @@ -208,6 +212,7 @@ var (
NamespacedName: fmt.Sprintf("%s/%s", testNamespace, serviceName),
},
Type: corev1.ServiceTypeLoadBalancer,
PublicIPResourceID: ptr.To("abc"),
IsDNSLabelConfigured: true,
},
},
Expand Down
9 changes: 9 additions & 0 deletions pkg/controllers/member/internalserviceexport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, err
}

if !internalSvcExport.ObjectMeta.DeletionTimestamp.IsZero() {
// Skip the reconciliation if the InternalServiceExport is being deleted.
// There is no need to report the conflicts back.
// For example, the serviceExport is no longer valid or valid with 0 weight.
// In these cases, there is no need to create internalServiceExport.
klog.V(2).InfoS("Ignoring deleting internalServiceExport", "internalServiceExport", internalSvcExportRef)
return ctrl.Result{}, nil
}

// Check if the exported Service exists.
svcNS := internalSvcExport.Spec.ServiceReference.Namespace
svcName := internalSvcExport.Spec.ServiceReference.Name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/metrics"
Expand All @@ -25,6 +26,9 @@ const (

eventuallyTimeout = time.Second * 10
eventuallyInterval = time.Millisecond * 250

consistentlyDuration = time.Second * 15
consistentlyInterval = time.Millisecond * 250
)

var (
Expand Down Expand Up @@ -247,4 +251,53 @@ var _ = Describe("internalsvcexport controller", func() {
Eventually(internalServiceExportHasLastObservedResourceVersionAnnotatedActual, eventuallyTimeout, eventuallyInterval).Should(BeNil())
})
})

Context("internalserviceexport is deleting", func() {
var svcExport *fleetnetv1alpha1.ServiceExport
var internalSvcExport *fleetnetv1alpha1.InternalServiceExport
finalizer := "internal-service-export-finalizer"

BeforeEach(func() {
svcExport = unfulfilledServiceExport()
Expect(memberClient.Create(ctx, svcExport)).Should(Succeed())

internalSvcExport = unfulfilledInternalServiceExport()
controllerutil.AddFinalizer(internalSvcExport, finalizer) // so that the internalserviceexport is not deleted immediately
Expect(hubClient.Create(ctx, internalSvcExport)).Should(Succeed())

By("Deleting internalServiceExport")
Expect(hubClient.Delete(ctx, internalSvcExport)).Should(Succeed())
})

AfterEach(func() {
Expect(hubClient.Get(ctx, internalSvcExportKey, internalSvcExport)).Should(Succeed())
controllerutil.RemoveFinalizer(internalSvcExport, finalizer)
Expect(hubClient.Update(ctx, internalSvcExport)).Should(Succeed())

Expect(memberClient.Delete(ctx, svcExport)).Should(Succeed())

// Confirm that both ServiceExport and InternalServiceExport have been deleted;
// this helps make the test less flaky.
Eventually(internalServiceExportIsAbsentActual, eventuallyTimeout, eventuallyInterval).Should(BeNil())
Eventually(serviceExportIsAbsentActual, eventuallyTimeout, eventuallyInterval).Should(BeNil())
})

It("should not report back conflict condition (conflict found)", func() {
// Add a conflict condition
Expect(hubClient.Get(ctx, internalSvcExportKey, internalSvcExport)).Should(Succeed())
meta.SetStatusCondition(&internalSvcExport.Status.Conditions,
conflictedServiceExportConflictCondition(memberUserNS, svcName))
Expect(hubClient.Status().Update(ctx, internalSvcExport)).Should(Succeed())

Consistently(func() error {
if err := memberClient.Get(ctx, svcExportKey, svcExport); err != nil {
return fmt.Errorf("serviceExport Get(%+v), got %w, want no error", svcExportKey, err)
}
if len(svcExport.Status.Conditions) != 0 {
return fmt.Errorf("serviceExport conditions got %+v, want empty", svcExport.Status.Conditions)
}
return nil
}, consistentlyDuration, consistentlyInterval).Should(BeNil())
})
})
})
29 changes: 29 additions & 0 deletions pkg/controllers/member/serviceexport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Get the weight from the serviceExport annotation and validate it.
exportWeight, err := extractWeightFromServiceExport(&svcExport)
if err != nil {
// Here we don't unexport the service to interrupt the traffic when using invalid annotation.
klog.ErrorS(err, "service export has invalid annotation weight", "service", svcRef)
validCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
expectedValidCond := metav1.Condition{
Expand All @@ -193,6 +194,34 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, r.MemberClient.Status().Update(ctx, &svcExport)
}

if exportWeight == 0 {
// The weight is 0, unexport the service.
klog.V(2).InfoS("Service has weight 0; unexport the service", "service", svcRef)
r.Recorder.Eventf(&svcExport, corev1.EventTypeNormal, "Service", "Service %s weight is set to 0", svc.Name)

if controllerutil.ContainsFinalizer(&svcExport, svcExportCleanupFinalizer) {
if _, err = r.unexportService(ctx, &svcExport); err != nil {
klog.ErrorS(err, "Failed to unexport the service", "service", svcRef)
return ctrl.Result{}, err
}
}
validCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
expectedValidCond := metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportValid),
Status: metav1.ConditionTrue,
Reason: svcExportValidCondReason,
ObservedGeneration: svcExport.Generation,
Message: fmt.Sprintf("Exported service %s/%s with 0 weight", svcExport.Namespace, svcExport.Name),
}
// Since the annotation won't change the generation, we compare the message here.
if condition.EqualConditionWithMessage(validCond, &expectedValidCond) {
// no need to retry if the condition is already set
return ctrl.Result{}, nil
}
meta.SetStatusCondition(&svcExport.Status.Conditions, expectedValidCond)
return ctrl.Result{}, r.MemberClient.Status().Update(ctx, &svcExport)
}

// Add the cleanup finalizer to the ServiceExport; this must happen before the Service is actually exported.
if !controllerutil.ContainsFinalizer(&svcExport, svcExportCleanupFinalizer) {
klog.V(2).InfoS("Add cleanup finalizer to service export", "service", svcRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,10 @@ var _ = Describe("serviceexport controller", func() {

By("update the serviceExport in the member cluster")
Expect(memberClient.Get(ctx, svcOrSvcExportKey, svcExport)).Should(Succeed())
svcExport.Annotations = map[string]string{
objectmeta.ServiceExportAnnotationWeight: strconv.Itoa(3837),
if svcExport.Annotations == nil {
svcExport.Annotations = make(map[string]string)
}
svcExport.Annotations[objectmeta.ServiceExportAnnotationWeight] = strconv.Itoa(3837)
Expect(memberClient.Update(ctx, svcExport)).Should(Succeed())

By("make sure the serviceExport is marked as invalid")
Expand All @@ -566,6 +567,45 @@ var _ = Describe("serviceexport controller", func() {
}
return nil
}, eventuallyTimeout, eventuallyInterval).Should(Succeed())

By("make sure the service is still exported")
err = serviceIsExportedToHubActual(svc.Spec.Type, false, ptr.To(int64(weight)))()
Expect(err).Should(Succeed(), "Service is not exported to hub: %v", err)
})

It("annotation weight is set to zero", func() {
By("confirm that the service has been exported")
Eventually(serviceIsExportedFromMemberActual, eventuallyTimeout, eventuallyInterval).Should(Succeed())
Eventually(serviceIsExportedToHubActual(svc.Spec.Type, false, ptr.To(int64(weight))), eventuallyTimeout, eventuallyInterval).Should(Succeed())

By("update the serviceExport in the member cluster")
Expect(memberClient.Get(ctx, svcOrSvcExportKey, svcExport)).Should(Succeed())
svcExport.Annotations = map[string]string{
objectmeta.ServiceExportAnnotationWeight: strconv.Itoa(0),
}
Expect(memberClient.Update(ctx, svcExport)).Should(Succeed())

By("make sure the serviceExport is marked as valid")
expectedCond := metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportValid),
Status: metav1.ConditionTrue,
Reason: svcExportValidCondReason,
ObservedGeneration: svcExport.Generation,
Message: fmt.Sprintf("Exported service %s/%s with 0 weight", svcExport.Namespace, svcExport.Name),
}
Eventually(func() error {
svcExport := &fleetnetv1alpha1.ServiceExport{}
Expect(memberClient.Get(ctx, svcOrSvcExportKey, svcExport)).Should(Succeed())
validCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
if diff := cmp.Diff(validCond, &expectedCond, ignoredCondFields); diff != "" {
return fmt.Errorf("serviceExportValid condition (-got, +want): %s", diff)
}
return nil
}, eventuallyTimeout, eventuallyInterval).Should(Succeed(), "Get() serviceExport mismatch")

By("make sure the service is unexported")
err := serviceIsNotExportedActual()
Expect(err).Should(Succeed(), "Failed to unexport the service: %v", err)
})
})

Expand Down
2 changes: 1 addition & 1 deletion test/common/trafficmanager/validator/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

const (
timeout = time.Second * 120 // need more time to create azure resources
timeout = time.Second * 300 // need more time to create azure resources
interval = time.Millisecond * 250
// duration used by consistently
duration = time.Second * 30
Expand Down
36 changes: 36 additions & 0 deletions test/e2e/framework/workload_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -29,6 +31,9 @@ import (
"go.goms.io/fleet-networking/pkg/common/uniquename"
)

// ignoredCondFields are fields that should be ignored when comparing conditions.
var ignoredCondFields = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")

// WorkloadManager represents a suite of variables of operations required to test exporting an service and more.
type WorkloadManager struct {
Fleet *Fleet
Expand Down Expand Up @@ -248,6 +253,37 @@ func (wm *WorkloadManager) UpdateServiceType(ctx context.Context, cluster *Clust
return nil
}

// UpdateServiceExportWeight updates the service export weight in the member cluster.
func (wm *WorkloadManager) UpdateServiceExportWeight(ctx context.Context, cluster *Cluster, weight int) error {
var svcExport fleetnetv1alpha1.ServiceExport
if err := cluster.kubeClient.Get(ctx, types.NamespacedName{Namespace: wm.namespace, Name: wm.service.Name}, &svcExport); err != nil {
return fmt.Errorf("failed to get service export %s in cluster %s: %w", wm.service.Name, cluster.Name(), err)
}
if svcExport.Annotations == nil {
svcExport.Annotations = make(map[string]string)
}
svcExport.Annotations[objectmeta.ServiceExportAnnotationWeight] = fmt.Sprintf("%d", weight)
if err := cluster.kubeClient.Update(ctx, &svcExport); err != nil {
return fmt.Errorf("failed to update service export %s in cluster %s: %w", svcExport.Name, cluster.Name(), err)
}
return nil
}

// ValidateServiceExportCondition validates the service export condition in the member cluster.
// The function will update the `wantCondition` using the latest generation of the serviceExport.
func (wm *WorkloadManager) ValidateServiceExportCondition(ctx context.Context, cluster *Cluster, wantCondition metav1.Condition) error {
var svcExport fleetnetv1alpha1.ServiceExport
if err := cluster.kubeClient.Get(ctx, types.NamespacedName{Namespace: wm.namespace, Name: wm.service.Name}, &svcExport); err != nil {
return fmt.Errorf("failed to get service export %s in cluster %s: %w", wm.service.Name, cluster.Name(), err)
}
wantCondition.ObservedGeneration = svcExport.Generation
gotCondition := meta.FindStatusCondition(svcExport.Status.Conditions, wantCondition.Type)
if diff := cmp.Diff(gotCondition, &wantCondition, ignoredCondFields); diff != "" {
return fmt.Errorf("serviceExport condition (-got, +want): %s", diff)
}
return nil
}

// RemoveWorkload deletes workload(deployment and its service) from member clusters.
func (wm *WorkloadManager) RemoveWorkload(ctx context.Context) error {
for _, m := range wm.Fleet.MemberClusters() {
Expand Down
Loading

0 comments on commit 1db34bd

Please sign in to comment.