From 0d34eebd4395852c16e38f5a9ccb2524b8bf692e Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Wed, 15 Jan 2025 15:26:07 -0500 Subject: [PATCH] chore: update based on pr feedback (move fx code into fx.go & test update) - move fx code into fx.go - update test defaults for baseClient_test.go --- chrysom/basicClient.go | 26 ----------- chrysom/basicClient_test.go | 36 ++++++--------- chrysom/fx.go | 87 +++++++++++++++++++++++++++++++++++++ chrysom/listenerClient.go | 50 --------------------- fx.go | 70 +++++++++++++++++++++++++++++ service.go | 68 +++-------------------------- 6 files changed, 178 insertions(+), 159 deletions(-) create mode 100644 chrysom/fx.go create mode 100644 fx.go diff --git a/chrysom/basicClient.go b/chrysom/basicClient.go index 6ab740e..cead42d 100644 --- a/chrysom/basicClient.go +++ b/chrysom/basicClient.go @@ -15,7 +15,6 @@ import ( "github.com/xmidt-org/ancla/auth" "github.com/xmidt-org/ancla/model" - "go.uber.org/fx" "go.uber.org/zap" ) @@ -93,31 +92,6 @@ const ( // Items is a slice of model.Item(s) . type Items []model.Item -// GetLogger returns a logger from the given context. -type GetLogger func(context.Context) *zap.Logger - -// SetLogger embeds the `Listener.logger` in outgoing request contexts for `Listener.Update` calls. -type SetLogger func(context.Context, *zap.Logger) context.Context - -type BasicClientIn struct { - fx.In - - // Ancla Client config. - Config BasicClientConfig - // GetLogger returns a logger from the given context. - GetLogger GetLogger -} - -// ProvideBasicClient provides a new BasicClient. -func ProvideBasicClient(in BasicClientIn) (*BasicClient, error) { - client, err := NewBasicClient(in.Config, in.GetLogger) - if err != nil { - return nil, errors.Join(errFailedConfig, err) - } - - return client, nil -} - // NewBasicClient creates a new BasicClient that can be used to // make requests to Argus. func NewBasicClient(config BasicClientConfig, diff --git a/chrysom/basicClient_test.go b/chrysom/basicClient_test.go index 4fbb00b..6f19710 100644 --- a/chrysom/basicClient_test.go +++ b/chrysom/basicClient_test.go @@ -54,34 +54,30 @@ func TestValidateBasicConfig(t *testing.T) { { Description: "No address", Input: &BasicClientConfig{ - HTTPClient: http.DefaultClient, - Bucket: "bucket-name", + Bucket: "bucket-name", }, ExpectedErr: ErrAddressEmpty, }, { Description: "No bucket", Input: &BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: "example.com", + Address: "example.com", }, ExpectedErr: ErrBucketEmpty, }, { Description: "All default values", Input: &BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: "example.com", - Bucket: "bucket-name", + Address: "example.com", + Bucket: "bucket-name", }, ExpectedConfig: allDefaultsCaseConfig, }, { Description: "All defined", Input: &BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: "example.com", - Bucket: "amazing-bucket", + Address: "example.com", + Bucket: "amazing-bucket", }, ExpectedConfig: allDefinedCaseConfig, }, @@ -188,9 +184,8 @@ func TestSendRequest(t *testing.T) { defer server.Close() client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: "example.com", - Bucket: "bucket-name", + Address: "example.com", + Bucket: "bucket-name", }, func(context.Context) *zap.Logger { return zap.NewNop() @@ -300,9 +295,8 @@ func TestGetItems(t *testing.T) { })) client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: server.URL, - Bucket: bucket, + Address: server.URL, + Bucket: bucket, }, func(context.Context) *zap.Logger { return zap.NewNop() @@ -446,9 +440,8 @@ func TestPushItem(t *testing.T) { })) client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: server.URL, - Bucket: bucket, + Address: server.URL, + Bucket: bucket, }, func(context.Context) *zap.Logger { return zap.NewNop() @@ -554,9 +547,8 @@ func TestRemoveItem(t *testing.T) { })) client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: server.URL, - Bucket: bucket, + Address: server.URL, + Bucket: bucket, }, func(context.Context) *zap.Logger { return zap.NewNop() }) diff --git a/chrysom/fx.go b/chrysom/fx.go new file mode 100644 index 0000000..7d8cab9 --- /dev/null +++ b/chrysom/fx.go @@ -0,0 +1,87 @@ +// SPDX-FileCopyrightText: 2025 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package chrysom + +import ( + "context" + "errors" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/fx" + "go.uber.org/zap" +) + +// GetLogger returns a logger from the given context. +type GetLogger func(context.Context) *zap.Logger + +// SetLogger embeds the `Listener.logger` in outgoing request contexts for `Listener.Update` calls. +type SetLogger func(context.Context, *zap.Logger) context.Context + +type BasicClientIn struct { + fx.In + + // Ancla Client config. + Config BasicClientConfig + // GetLogger returns a logger from the given context. + GetLogger GetLogger +} + +// ProvideBasicClient provides a new BasicClient. +func ProvideBasicClient(in BasicClientIn) (*BasicClient, error) { + client, err := NewBasicClient(in.Config, in.GetLogger) + if err != nil { + return nil, errors.Join(errFailedConfig, err) + } + + return client, nil +} + +// ListenerConfig contains config data for polling the Argus client. +type ListenerClientIn struct { + fx.In + + // Listener fetches a copy of all items within a bucket on + // an interval based on `BasicClientConfig.PullInterval`. + // (Optional). If not provided, listening won't be enabled for this client. + Listener Listener + // Config configures the ancla client and its listeners. + Config BasicClientConfig + // PollsTotalCounter measures the number of polls (and their success/failure outcomes) to fetch new items. + PollsTotalCounter *prometheus.CounterVec `name:"chrysom_polls_total"` + // Reader is the DB interface used to fetch new items using `GeItems`. + Reader Reader + // GetLogger returns a logger from the given context. + GetLogger GetLogger + // SetLogger embeds the `Listener.logger` in outgoing request contexts for `Listener.Update` calls. + SetLogger SetLogger +} + +// ProvideListenerClient provides a new ListenerClient. +func ProvideListenerClient(in ListenerClientIn) (*ListenerClient, error) { + client, err := NewListenerClient(in.Listener, in.GetLogger, in.SetLogger, in.Config.PullInterval, in.PollsTotalCounter, in.Reader) + if err != nil { + return nil, errors.Join(err, errFailedConfig) + } + + return client, nil +} + +func ProvideDefaultListenerReader(client *BasicClient) Reader { + return client +} + +type StartListenerIn struct { + fx.In + + Listener *ListenerClient + LC fx.Lifecycle +} + +// ProvideStartListenerClient starts the Argus listener client service. +func ProvideStartListenerClient(in StartListenerIn) error { + in.Listener.Start(context.Background()) + in.LC.Append(fx.StopHook(in.Listener.Stop)) + + return nil +} diff --git a/chrysom/listenerClient.go b/chrysom/listenerClient.go index c9b1b0d..6682f00 100644 --- a/chrysom/listenerClient.go +++ b/chrysom/listenerClient.go @@ -10,7 +10,6 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/fx" "go.uber.org/zap" ) @@ -38,26 +37,6 @@ const ( defaultPullInterval = time.Second * 5 ) -// ListenerConfig contains config data for polling the Argus client. -type ListenerClientIn struct { - fx.In - - // Listener fetches a copy of all items within a bucket on - // an interval based on `BasicClientConfig.PullInterval`. - // (Optional). If not provided, listening won't be enabled for this client. - Listener Listener - // Config configures the ancla client and its listeners. - Config BasicClientConfig - // PollsTotalCounter measures the number of polls (and their success/failure outcomes) to fetch new items. - PollsTotalCounter *prometheus.CounterVec `name:"chrysom_polls_total"` - // Reader is the DB interface used to fetch new items using `GeItems`. - Reader Reader - // GetLogger returns a logger from the given context. - GetLogger GetLogger - // SetLogger embeds the `Listener.logger` in outgoing request contexts for `Listener.Update` calls. - SetLogger SetLogger -} - // ListenerClient is the client used to poll Argus for updates. type ListenerClient struct { observer *observerConfig @@ -76,20 +55,6 @@ type observerConfig struct { state int32 } -// ProvideListenerClient provides a new ListenerClient. -func ProvideListenerClient(in ListenerClientIn) (*ListenerClient, error) { - client, err := NewListenerClient(in.Listener, in.GetLogger, in.SetLogger, in.Config.PullInterval, in.PollsTotalCounter, in.Reader) - if err != nil { - return nil, errors.Join(err, errFailedConfig) - } - - return client, nil -} - -func ProvideDefaultListenerReader(client *BasicClient) Reader { - return client -} - // NewListenerClient creates a new ListenerClient to be used to poll Argus // for updates. func NewListenerClient(listener Listener, @@ -188,18 +153,3 @@ func (c *ListenerClient) Stop(ctx context.Context) error { atomic.SwapInt32(&c.observer.state, stopped) return nil } - -type StartListenerIn struct { - fx.In - - Listener *ListenerClient - LC fx.Lifecycle -} - -// ProvideStartListenerClient starts the Argus listener client service. -func ProvideStartListenerClient(in StartListenerIn) error { - in.Listener.Start(context.Background()) - in.LC.Append(fx.StopHook(in.Listener.Stop)) - - return nil -} diff --git a/fx.go b/fx.go new file mode 100644 index 0000000..61b3df2 --- /dev/null +++ b/fx.go @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2025 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package ancla + +import ( + "github.com/xmidt-org/ancla/chrysom" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/fx" +) + +type ServiceIn struct { + fx.In + + // Ancla Client. + BasicClient *chrysom.BasicClient +} + +// ProvideService builds the Argus client service from the given configuration. +func ProvideService(in ServiceIn) Service { + return NewService(in.BasicClient) +} + +// TODO: Refactor and move Watch and Listener related code to chrysom. +type DefaultListenersIn struct { + fx.In + + // Metric for webhook list size, used by the webhook list size watcher listener. + WebhookListSizeGauge prometheus.Gauge `name:"webhook_list_size"` +} + +type DefaultListenerOut struct { + fx.Out + + Watchers []Watch `group:"watchers,flatten"` +} + +func ProvideDefaultListenerWatchers(in DefaultListenersIn) DefaultListenerOut { + var watchers []Watch + + watchers = append(watchers, webhookListSizeWatch(in.WebhookListSizeGauge)) + + return DefaultListenerOut{ + Watchers: watchers, + } +} + +type ListenerIn struct { + fx.In + + Shutdowner fx.Shutdowner + // Watchers are called by the Listener when new webhooks are fetched. + Watchers []Watch `group:"watchers"` +} + +func ProvideListener(in ListenerIn) chrysom.Listener { + return chrysom.ListenerFunc(func(items chrysom.Items) { + iws, err := ItemsToInternalWebhooks(items) + if err != nil { + in.Shutdowner.Shutdown(fx.ExitCode(1)) + + return + } + + for _, watch := range in.Watchers { + watch.Update(iws) + } + }) +} diff --git a/service.go b/service.go index 4ce7ddc..120b74d 100644 --- a/service.go +++ b/service.go @@ -9,9 +9,7 @@ import ( "fmt" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/xmidt-org/ancla/chrysom" - "go.uber.org/fx" ) const errFmt = "%w: %v" @@ -25,7 +23,6 @@ var ( ) // Service describes the core operations around webhook subscriptions. -// Initialize() provides a service ready to use and the controls around watching for updates. type Service interface { // Add adds the given owned webhook to the current list of webhooks. If the operation // succeeds, a non-nil error is returned. @@ -35,12 +32,13 @@ type Service interface { GetAll(ctx context.Context) ([]InternalWebhook, error) } -// Config contains information needed to initialize the Argus Client service. +// Config contains information needed to initialize the Argus database client. type Config struct { + // BasicClientConfig is the configuration for the Argus database client. BasicClientConfig chrysom.BasicClientConfig // DisablePartnerIDs, if true, will allow webhooks to register without - // checking the validity of the partnerIDs in the request + // checking the validity of the partnerIDs in the request. DisablePartnerIDs bool // Validation provides options for validating the webhook's URL and TTL @@ -52,10 +50,13 @@ type Config struct { } type service struct { + // argus is the Argus database client. argus chrysom.PushReader now func() time.Time } +// Add adds the given owned webhook to the current list of webhooks. If the operation +// succeeds, a non-nil error is returned. func (s *service) Add(ctx context.Context, owner string, iw InternalWebhook) error { item, err := InternalWebhookToItem(s.now, iw) if err != nil { @@ -93,65 +94,10 @@ func (s *service) GetAll(ctx context.Context) ([]InternalWebhook, error) { return iws, nil } +// NewService returns an ancla client used to interact with an Argus database. func NewService(client *chrysom.BasicClient) *service { return &service{ argus: client, now: time.Now, } } - -type ServiceIn struct { - fx.In - - BasicClient *chrysom.BasicClient -} - -// ProvideService builds the Argus client service from the given configuration. -func ProvideService(in ServiceIn) Service { - return NewService(in.BasicClient) -} - -// TODO: Refactor and move Watch and Listener related code to chrysom. -type DefaultListenersIn struct { - fx.In - - WebhookListSizeGauge prometheus.Gauge `name:"webhook_list_size"` -} - -type DefaultListenerOut struct { - fx.Out - - Watchers []Watch `group:"watchers,flatten"` -} - -func ProvideDefaultListeners(in DefaultListenersIn) DefaultListenerOut { - var watchers []Watch - - watchers = append(watchers, webhookListSizeWatch(in.WebhookListSizeGauge)) - - return DefaultListenerOut{ - Watchers: watchers, - } -} - -type ListenerIn struct { - fx.In - - Shutdowner fx.Shutdowner - Watchers []Watch `group:"watchers"` -} - -func ProvideListener(in ListenerIn) chrysom.Listener { - return chrysom.ListenerFunc(func(items chrysom.Items) { - iws, err := ItemsToInternalWebhooks(items) - if err != nil { - in.Shutdowner.Shutdown(fx.ExitCode(1)) - - return - } - - for _, watch := range in.Watchers { - watch.Update(iws) - } - }) -}