Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: introduce generic xds clients xDS and LRS Client API signatures #8042

Merged
merged 26 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
56459d6
xds: generic xds client common configs
purnesh42H Jan 21, 2025
1bb7909
re-push comments
purnesh42H Jan 22, 2025
03ea00d
improve ServerConfig equal
purnesh42H Jan 22, 2025
bc8d7a7
easwar review round 1 on documentation
purnesh42H Jan 23, 2025
053ff95
easwar comments on docstrings
purnesh42H Jan 24, 2025
4cd4ef1
easwar comments round 4
purnesh42H Jan 28, 2025
49f1e3b
config tests
purnesh42H Jan 28, 2025
835f9e4
merge with previous pr
purnesh42H Jan 31, 2025
f93c438
xds: add lrs client and xDS client interfaces
purnesh42H Jan 27, 2025
72b99b4
second pass to documentation language
purnesh42H Feb 3, 2025
77689c2
change from godoc review
purnesh42H Feb 4, 2025
0659a44
dfawley review 2
purnesh42H Feb 7, 2025
5454ab1
easwar review 1
purnesh42H Feb 10, 2025
c1a18cd
changed to decoder struct
purnesh42H Feb 11, 2025
077bdd0
move authorities under xds client
purnesh42H Feb 18, 2025
0dd84b6
easwar review 2
purnesh42H Feb 19, 2025
03b3e6d
easwars review 4
purnesh42H Feb 20, 2025
8f7a4df
ResourceWatcher done and LoadStore stop
purnesh42H Feb 25, 2025
48b05eb
doug
purnesh42H Feb 26, 2025
0418208
address godoc review comments
purnesh42H Feb 27, 2025
1569a11
dfawley nits
purnesh42H Feb 28, 2025
0d234e0
rebase over grpctransport changes
purnesh42H Feb 28, 2025
0608187
Move clients config helpers to clients/internal
purnesh42H Mar 3, 2025
833a19b
remove ReportLoadTimeout from lrs config
purnesh42H Mar 3, 2025
60166b7
doug final nits
purnesh42H Mar 4, 2025
b6ac539
remove ServerIdentifierEqual
purnesh42H Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 13 additions & 125 deletions xds/internal/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*
*/

// Package clients provides implementations of the xDS and LRS clients,
// enabling applications to communicate with xDS management servers and report
// load.
// Package clients provides implementations of the clients to interact with
// xDS and LRS servers.
//
// xDS Client
// # xDS Client
//
// The xDS client allows applications to:
// - Create client instances with in-memory configurations.
Expand All @@ -41,100 +40,33 @@
//
// NOTICE: This package is EXPERIMENTAL and may be changed or removed
// in a later release.
//
// See [README](https://github.com/grpc/grpc-go/tree/master/xds/clients/README.md).
package clients

import (
"fmt"
"slices"
"strings"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
)

// ServerConfig holds settings for connecting to an xDS management server.
type ServerConfig struct {
// ServerURI is the target URI of the xDS management server.
// ServerIdentifier holds identifying information for connecting to an xDS
// management or LRS server.
type ServerIdentifier struct {
// ServerURI is the target URI of the server.
ServerURI string

// IgnoreResourceDeletion is a server feature which if set to true,
// indicates that resource deletion errors can be ignored and cached
// resource data can be used.
//
// This will be removed in the future once we implement gRFC A88
// and two new fields FailOnDataErrors and
// ResourceTimerIsTransientError will be introduced.
IgnoreResourceDeletion bool

// Extensions can be populated with arbitrary data to be passed to the
// [TransportBuilder] and/or xDS Client's ResourceType implementations.
// TransportBuilder and/or xDS Client's ResourceType implementations.
// This field can be used to provide additional configuration or context
// specific to the user's needs.
//
// The xDS and LRS clients do not interpret the contents of this field.
// It is the responsibility of the user's custom [TransportBuilder] and/or
// It is the responsibility of the user's custom TransportBuilder and/or
// ResourceType implementations to handle and interpret these extensions.
//
// For example, a custom [TransportBuilder] might use this field to
// For example, a custom TransportBuilder might use this field to
// configure a specific security credentials.
//
// Note: For custom types used in Extensions, ensure an Equal(any) bool
// method is implemented for equality checks on ServerConfig.
Extensions any
}

// equal returns true if sc and other are considered equal.
func (sc *ServerConfig) equal(other *ServerConfig) bool {
switch {
case sc == nil && other == nil:
return true
case (sc != nil) != (other != nil):
return false
case sc.ServerURI != other.ServerURI:
return false
case sc.IgnoreResourceDeletion != other.IgnoreResourceDeletion:
return false
}
if sc.Extensions == nil && other.Extensions == nil {
return true
}
if ex, ok := sc.Extensions.(interface{ Equal(any) bool }); ok && ex.Equal(other.Extensions) {
return true
}
return false
}

// String returns a string representation of the [ServerConfig].
//
// WARNING: This method is primarily intended for logging and testing
// purposes. The output returned by this method is not guaranteed to be stable
// and may change at any time. Do not rely on it for production use.
func (sc *ServerConfig) String() string {
return strings.Join([]string{sc.ServerURI, fmt.Sprintf("%v", sc.IgnoreResourceDeletion)}, "-")
}

// Authority contains configuration for an xDS control plane authority.
type Authority struct {
// XDSServers contains the list of server configurations for this authority.
XDSServers []ServerConfig

// Extensions can be populated with arbitrary data to be passed to the xDS
// Client's user specific implementations. This field can be used to
// provide additional configuration or context specific to the user's
// needs.
//
// The xDS and LRS clients do not interpret the contents of this field. It
// is the responsibility of the user's implementations to handle and
// interpret these extensions.
// method is implemented for equality checks on ServerIdentifier.
Extensions any
}

// Node represents the identity of the xDS client, allowing
// management servers to identify the source of xDS requests.
// Node represents the identity of the xDS client, allowing xDS and LRS servers
// to identify the source of xDS requests.
type Node struct {
// ID is a string identifier of the application.
ID string
Expand All @@ -150,40 +82,6 @@ type Node struct {
UserAgentName string
// UserAgentVersion is the user agent version of application.
UserAgentVersion string
// ClientFeatures is a list of xDS features supported by this client.
// These features are set within the xDS client, but may be overridden only
// for testing purposes.
clientFeatures []string
}

// toProto converts an instance of [Node] to its protobuf representation.
func (n Node) toProto() *v3corepb.Node {
return &v3corepb.Node{
Id: n.ID,
Cluster: n.Cluster,
Locality: func() *v3corepb.Locality {
if n.Locality.isEmpty() {
return nil
}
return &v3corepb.Locality{
Region: n.Locality.Region,
Zone: n.Locality.Zone,
SubZone: n.Locality.SubZone,
}
}(),
Metadata: func() *structpb.Struct {
if n.Metadata == nil {
return nil
}
if md, ok := n.Metadata.(*structpb.Struct); ok {
return proto.Clone(md).(*structpb.Struct)
}
return nil
}(),
UserAgentName: n.UserAgentName,
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: n.UserAgentVersion},
ClientFeatures: slices.Clone(n.clientFeatures),
}
}

// Locality represents the location of the xDS client application.
Expand All @@ -195,13 +93,3 @@ type Locality struct {
// SubZone is the further subdivision within a zone.
SubZone string
}

// isEmpty reports whether l is considered empty.
func (l Locality) isEmpty() bool {
return l.equal(Locality{})
}

// equal returns true if l and other are considered equal.
func (l Locality) equal(other Locality) bool {
return l.Region == other.Region && l.Zone == other.Zone && l.SubZone == other.SubZone
}
32 changes: 16 additions & 16 deletions xds/internal/clients/grpctransport/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,38 @@
"google.golang.org/grpc/xds/internal/clients"
)

// ServerConfigExtension holds settings for connecting to a gRPC server,
// ServerIdentifierExtension holds settings for connecting to a gRPC server,
// such as an xDS management or an LRS server.
type ServerConfigExtension struct {
type ServerIdentifierExtension struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have an Equal method, but our own docs recommend one? Are we sure we actually need one? If only the transport would need it, then I don't think we do.

Copy link
Contributor Author

@purnesh42H purnesh42H Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need it when de-duplicating the transport channel. That will come in final xDS client implementation PR. I was thinking of adding it at that time. Why do you think we won't need one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplicating will require it to be a map key, not to have an Equal method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can hash the complex fields like extensions right? So, we need a equal method to get the right key for the map and then lookup that key. Something like done here for xDS Server Config

if a.activeXDSChannel != nil && a.activeXDSChannel.serverConfig.Equal(serverConfig) {
.

So, ServerConfig needs equal method and in turn we need to check equality for embedded ServerIdentifier. I don't think we can get away without equality unless every field has a String() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@easwars wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed internally with @dfawley. For now, we can remove and see if we can find other ways to tackle comparing channels.

// Credentials will be used for all gRPC transports. If it is unset,
// transport creation will fail.
Credentials credentials.Bundle
}

// Builder creates gRPC-based Transports. It must be paired with ServerConfigs
// that contain an Extension field of type ServerConfigExtension.
// Builder creates gRPC-based Transports. It must be paired with ServerIdentifiers
// that contain an Extension field of type ServerIdentifierExtension.
type Builder struct{}

// Build returns a gRPC-based clients.Transport.
//
// The Extension field of the ServerConfig must be a ServerConfigExtension.
func (b *Builder) Build(sc clients.ServerConfig) (clients.Transport, error) {
if sc.ServerURI == "" {
return nil, fmt.Errorf("grpctransport: ServerURI is not set in ServerConfig")
// The Extension field of the ServerIdentifier must be a ServerIdentifierExtension.
func (b *Builder) Build(si clients.ServerIdentifier) (clients.Transport, error) {
if si.ServerURI == "" {
return nil, fmt.Errorf("grpctransport: ServerURI is not set in ServerIdentifier")
}
if sc.Extensions == nil {
return nil, fmt.Errorf("grpctransport: Extensions is not set in ServerConfig")
if si.Extensions == nil {
return nil, fmt.Errorf("grpctransport: Extensions is not set in ServerIdentifier")
}
sce, ok := sc.Extensions.(ServerConfigExtension)
sce, ok := si.Extensions.(ServerIdentifierExtension)
if !ok {
return nil, fmt.Errorf("grpctransport: Extensions field is %T, but must be %T in ServerConfig", sc.Extensions, ServerConfigExtension{})
return nil, fmt.Errorf("grpctransport: Extensions field is %T, but must be %T in ServerIdentifier", si.Extensions, ServerIdentifierExtension{})
}
if sce.Credentials == nil {
return nil, fmt.Errorf("grptransport: Credentials field is not set in ServerConfigExtension")
return nil, fmt.Errorf("grptransport: Credentials field is not set in ServerIdentifierExtension")
}

// TODO: Incorporate reference count map for existing transports and
// deduplicate transports based on the provided ServerConfig so that
// deduplicate transports based on the provided ServerIdentifier so that
// transport channel to same server can be shared between xDS and LRS
// client.

Expand All @@ -74,9 +74,9 @@
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
})
cc, err := grpc.NewClient(sc.ServerURI, kpCfg, grpc.WithCredentialsBundle(sce.Credentials), grpc.WithDefaultCallOptions(grpc.ForceCodec(&byteCodec{})))
cc, err := grpc.NewClient(si.ServerURI, kpCfg, grpc.WithCredentialsBundle(sce.Credentials), grpc.WithDefaultCallOptions(grpc.ForceCodec(&byteCodec{})))
if err != nil {
return nil, fmt.Errorf("grpctransport: failed to create transport to server %q: %v", sc.ServerURI, err)
return nil, fmt.Errorf("grpctransport: failed to create transport to server %q: %v", si.ServerURI, err)

Check warning on line 79 in xds/internal/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/clients/grpctransport/grpc_transport.go#L79

Added line #L79 was not covered by tests
}

return &grpcTransport{cc: cc}, nil
Expand Down
40 changes: 21 additions & 19 deletions xds/internal/clients/grpctransport/grpc_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (s *testServer) StreamAggregatedResources(stream v3discoverygrpc.Aggregated
return err // Handle other errors
}

// Push received request for client to verify the correct request was
// received.
select {
case s.requestChan <- req:
case <-ctx.Done():
Expand All @@ -130,9 +132,9 @@ func (tc *testCredentials) TransportCredentials() credentials.TransportCredentia
// TestBuild_Success verifies that the Builder successfully creates a new
// Transport with a non-nil grpc.ClientConn.
func (s) TestBuild_Success(t *testing.T) {
serverCfg := clients.ServerConfig{
serverCfg := clients.ServerIdentifier{
ServerURI: "server-address",
Extensions: ServerConfigExtension{Credentials: &testCredentials{transportCredentials: local.NewCredentials()}},
Extensions: ServerIdentifierExtension{Credentials: &testCredentials{transportCredentials: local.NewCredentials()}},
}

b := &Builder{}
Expand All @@ -151,41 +153,41 @@ func (s) TestBuild_Success(t *testing.T) {
}

// TestBuild_Failure verifies that the Builder returns error when incorrect
// ServerConfig is provided.
// ServerIdentifier is provided.
//
// It covers the following scenarios:
// - ServerURI is empty.
// - Extensions is nil.
// - Extensions is not ServerConfigExtension.
// - Extensions is not ServerIdentifierExtension.
// - Credentials are nil.
func (s) TestBuild_Failure(t *testing.T) {
tests := []struct {
name string
serverCfg clients.ServerConfig
serverCfg clients.ServerIdentifier
}{
{
name: "ServerURI is empty",
serverCfg: clients.ServerConfig{
serverCfg: clients.ServerIdentifier{
ServerURI: "",
Extensions: ServerConfigExtension{Credentials: insecure.NewBundle()},
Extensions: ServerIdentifierExtension{Credentials: insecure.NewBundle()},
},
},
{
name: "Extensions is nil",
serverCfg: clients.ServerConfig{ServerURI: "server-address"},
serverCfg: clients.ServerIdentifier{ServerURI: "server-address"},
},
{
name: "Extensions is not a ServerConfigExtension",
serverCfg: clients.ServerConfig{
name: "Extensions is not a ServerIdentifierExtension",
serverCfg: clients.ServerIdentifier{
ServerURI: "server-address",
Extensions: 1,
},
},
{
name: "ServerConfigExtension Credentials is nil",
serverCfg: clients.ServerConfig{
name: "ServerIdentifierExtension Credentials is nil",
serverCfg: clients.ServerIdentifier{
ServerURI: "server-address",
Extensions: ServerConfigExtension{},
Extensions: ServerIdentifierExtension{},
},
},
}
Expand All @@ -208,9 +210,9 @@ func (s) TestBuild_Failure(t *testing.T) {
func (s) TestNewStream_Success(t *testing.T) {
ts := setupTestServer(t, &v3discoverypb.DiscoveryResponse{VersionInfo: "1"})

serverCfg := clients.ServerConfig{
serverCfg := clients.ServerIdentifier{
ServerURI: ts.address,
Extensions: ServerConfigExtension{Credentials: insecure.NewBundle()},
Extensions: ServerIdentifierExtension{Credentials: insecure.NewBundle()},
}
builder := Builder{}
transport, err := builder.Build(serverCfg)
Expand All @@ -229,9 +231,9 @@ func (s) TestNewStream_Success(t *testing.T) {
// TestNewStream_Error verifies that NewStream() returns an error
// when attempting to create a stream with an invalid server URI.
func (s) TestNewStream_Error(t *testing.T) {
serverCfg := clients.ServerConfig{
serverCfg := clients.ServerIdentifier{
ServerURI: "invalid-server-uri",
Extensions: ServerConfigExtension{Credentials: insecure.NewBundle()},
Extensions: ServerIdentifierExtension{Credentials: insecure.NewBundle()},
}
builder := Builder{}
transport, err := builder.Build(serverCfg)
Expand Down Expand Up @@ -262,9 +264,9 @@ func (s) TestStream_SendAndRecv(t *testing.T) {
ts := setupTestServer(t, &v3discoverypb.DiscoveryResponse{VersionInfo: "1"})

// Build a grpc-based transport to the above server.
serverCfg := clients.ServerConfig{
serverCfg := clients.ServerIdentifier{
ServerURI: ts.address,
Extensions: ServerConfigExtension{Credentials: insecure.NewBundle()},
Extensions: ServerIdentifierExtension{Credentials: insecure.NewBundle()},
}
builder := Builder{}
transport, err := builder.Build(serverCfg)
Expand Down
Loading