Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdsbalancer: switch cluster watch to generic xDS client API #6600

Merged
merged 8 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus
// cluster resource. The test verifies that the load balancing configuration
// pushed to the cluster_resolver LB policy is contains the expected discovery
// mechanism corresponding to the leaf cluster, on both occasions.
func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) {
func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
tests := []struct {
name string
firstClusterResource *v3clusterpb.Cluster
Expand Down Expand Up @@ -657,9 +657,11 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
// Tests the scenario where the aggregate cluster graph has a node that has
// child node of itself. The case for this is A -> A, and since there is no base
// cluster (EDS or Logical DNS), no configuration should be pushed to the child
// policy. Then the test updates A -> B, where B is a leaf EDS cluster. Verifies
// that configuration is pushed to the child policy and that an RPC can be
// successfully made.
// policy. The channel is expected to move to TRANSIENT_FAILURE and RPCs are
// expected to fail with code UNAVAILABLE and an error message specifying that
// the aggregate cluster grpah no leaf clusters. Then the test updates A -> B,
// where B is a leaf EDS cluster. Verifies that configuration is pushed to the
// child policy and that an RPC can be successfully made.
func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
Expand Down Expand Up @@ -687,6 +689,19 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
case <-time.After(defaultTestShortTimeout):
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Verify that the RPC fails with expected code.
client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
}
const wantErr = "aggregate cluster graph has no leaf clusters"
if !strings.Contains(err.Error(), wantErr) {
t.Fatalf("EmptyCall() failed with err: %v, want error containing %s", err, wantErr)
}

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
t.Cleanup(server.Stop)
Expand Down Expand Up @@ -719,6 +734,111 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
}

// Tests the scenario where the aggregate cluster graph contains a cycle and
// contains no leaf clusters. The case used here is [A -> B, B -> A]. As there
// are no leaf clusters in this graph, no configuration should be pushed to the
// child policy. The channel is expected to move to TRANSIENT_FAILURE and RPCs
// are expected to fail with code UNAVAILABLE and an error message specifying
// that the aggregate cluster graph has no leaf clusters.
func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)

const (
clusterNameA = clusterName // cluster name in cds LB policy config
clusterNameB = clusterName + "-B"
)
// Configure the management server with an aggregate cluster resource graph
// that contains a cycle and no leaf clusters.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
makeAggregateClusterResource(clusterNameB, []string{clusterNameA}),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

select {
case cfg := <-lbCfgCh:
t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
case <-time.After(defaultTestShortTimeout):
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Verify that the RPC fails with expected code.
client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
}
const wantErr = "aggregate cluster graph has no leaf clusters"
if !strings.Contains(err.Error(), wantErr) {
t.Fatalf("EmptyCall() failed with err: %v, want %s", err, wantErr)
}
}

// Tests the scenario where the aggregate cluster graph contains a cycle and
// also contains a leaf cluster. The case used here is [A -> B, B -> A, C]. As
// there is a leaf cluster in this graph , configuration should be pushed to the
// child policy and RPCs should get routed to that leaf cluster.
func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
t.Cleanup(server.Stop)

const (
clusterNameA = clusterName // cluster name in cds LB policy config
clusterNameB = clusterName + "-B"
clusterNameC = clusterName + "-C"
)
// Configure the management server with an aggregate cluster resource graph
// that contains a cycle, but also contains a leaf cluster.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
makeAggregateClusterResource(clusterNameB, []string{clusterNameA, clusterNameC}),
e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify the configuration pushed to the child policy.
wantChildCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
Cluster: clusterNameC,
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}
if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
Expand Down
Loading