From 3d643e06810e08f65e5b9f98e2a1353a525459af Mon Sep 17 00:00:00 2001 From: George Date: Tue, 27 Oct 2020 11:45:05 +0000 Subject: [PATCH] refactor(notification): move rule service into own package (#19804) * refactor(notification): move rule service into own package * chore(launcher): fix tests to use clients as opposed to direct kv service * chore(influx): update task cli to consume core domain model task from client * chore(kv): remove rule service behaviours from kv This also introduces the org id resolver type. Which is transplanted from the kv service. As this one function coupled all resource capabilities onto the kv service. Making removing these capabilities impossible. Moving this type out into its own package which depends on each service explicitly ensures we don't have one type which has to implement all the service contracts. * fix(launcher): remove double reference to influxdb package --- authorizer/authorize_find.go | 2 +- authorizer/label.go | 17 +- authorizer/urm.go | 18 +- authorizer/urm_test.go | 4 +- cmd/influx/task.go | 10 +- cmd/influxd/launcher/launcher.go | 32 +- cmd/influxd/launcher/launcher_helpers.go | 35 +- cmd/influxd/launcher/pkger_test.go | 37 +- http/api_handler.go | 2 +- http/document_service_test.go | 9 +- http/notification_rule.go | 39 +- http/task_service.go | 67 +- internal/resource/org_id.go | 194 +++ kv/initial_migration.go | 2 +- kv/notification_rule.go | 498 -------- kv/notification_rule_test.go | 93 -- kv/org.go | 82 -- label/middleware_auth.go | 16 +- notification/rule/service/service.go | 497 ++++++++ .../rule/service/service_external_test.go | 1115 +++++------------ notification/rule/service/service_test.go | 162 +++ pkger/service.go | 3 +- testing/notification_endpoint.go | 1 - 23 files changed, 1362 insertions(+), 1573 deletions(-) create mode 100644 internal/resource/org_id.go delete mode 100644 kv/notification_rule.go delete mode 100644 kv/notification_rule_test.go create mode 100644 notification/rule/service/service.go rename testing/notification_rule.go => notification/rule/service/service_external_test.go (66%) create mode 100644 notification/rule/service/service_test.go diff --git a/authorizer/authorize_find.go b/authorizer/authorize_find.go index f15c3b5f677..f517ceee01a 100644 --- a/authorizer/authorize_find.go +++ b/authorizer/authorize_find.go @@ -284,7 +284,7 @@ func AuthorizeFindChecks(ctx context.Context, rs []influxdb.Check) ([]influxdb.C } // AuthorizeFindUserResourceMappings takes the given items and returns only the ones that the user is authorized to read. -func AuthorizeFindUserResourceMappings(ctx context.Context, os OrganizationService, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) { +func AuthorizeFindUserResourceMappings(ctx context.Context, os OrgIDResolver, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) { // This filters without allocating // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating rrs := rs[:0] diff --git a/authorizer/label.go b/authorizer/label.go index d3b50590ef3..38ef58b8674 100644 --- a/authorizer/label.go +++ b/authorizer/label.go @@ -2,7 +2,6 @@ package authorizer import ( "context" - "errors" "github.com/influxdata/influxdb/v2" ) @@ -12,16 +11,16 @@ var _ influxdb.LabelService = (*LabelService)(nil) // LabelService wraps a influxdb.LabelService and authorizes actions // against it appropriately. type LabelService struct { - s influxdb.LabelService - orgSvc OrganizationService + s influxdb.LabelService + orgIDResolver OrgIDResolver } // NewLabelServiceWithOrg constructs an instance of an authorizing label serivce. // Replaces NewLabelService. -func NewLabelServiceWithOrg(s influxdb.LabelService, orgSvc OrganizationService) *LabelService { +func NewLabelServiceWithOrg(s influxdb.LabelService, orgIDResolver OrgIDResolver) *LabelService { return &LabelService{ - s: s, - orgSvc: orgSvc, + s: s, + orgIDResolver: orgIDResolver, } } @@ -55,10 +54,8 @@ func (s *LabelService) FindResourceLabels(ctx context.Context, filter influxdb.L if err := filter.ResourceType.Valid(); err != nil { return nil, err } - if s.orgSvc == nil { - return nil, errors.New("failed to find orgSvc") - } - orgID, err := s.orgSvc.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) + + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) if err != nil { return nil, err } diff --git a/authorizer/urm.go b/authorizer/urm.go index 02566aa6e18..b57097a3f9e 100644 --- a/authorizer/urm.go +++ b/authorizer/urm.go @@ -6,19 +6,19 @@ import ( "github.com/influxdata/influxdb/v2" ) -type OrganizationService interface { +type OrgIDResolver interface { FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) } type URMService struct { - s influxdb.UserResourceMappingService - orgService OrganizationService + s influxdb.UserResourceMappingService + orgIDResolver OrgIDResolver } -func NewURMService(orgSvc OrganizationService, s influxdb.UserResourceMappingService) *URMService { +func NewURMService(orgIDResolver OrgIDResolver, s influxdb.UserResourceMappingService) *URMService { return &URMService{ - s: s, - orgService: orgSvc, + s: s, + orgIDResolver: orgIDResolver, } } @@ -27,11 +27,11 @@ func (s *URMService) FindUserResourceMappings(ctx context.Context, filter influx if err != nil { return nil, 0, err } - return AuthorizeFindUserResourceMappings(ctx, s.orgService, urms) + return AuthorizeFindUserResourceMappings(ctx, s.orgIDResolver, urms) } func (s *URMService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error { - orgID, err := s.orgService.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID) + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID) if err != nil { return err } @@ -49,7 +49,7 @@ func (s *URMService) DeleteUserResourceMapping(ctx context.Context, resourceID i } for _, urm := range urms { - orgID, err := s.orgService.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID) + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID) if err != nil { return err } diff --git a/authorizer/urm_test.go b/authorizer/urm_test.go index bc1da8cf1a2..e1dd7de79e6 100644 --- a/authorizer/urm_test.go +++ b/authorizer/urm_test.go @@ -23,7 +23,7 @@ func (s *OrgService) FindResourceOrganizationID(ctx context.Context, rt influxdb func TestURMService_FindUserResourceMappings(t *testing.T) { type fields struct { UserResourceMappingService influxdb.UserResourceMappingService - OrgService authorizer.OrganizationService + OrgService authorizer.OrgIDResolver } type args struct { permission influxdb.Permission @@ -146,7 +146,7 @@ func TestURMService_FindUserResourceMappings(t *testing.T) { func TestURMService_WriteUserResourceMapping(t *testing.T) { type fields struct { UserResourceMappingService influxdb.UserResourceMappingService - OrgService authorizer.OrganizationService + OrgService authorizer.OrgIDResolver } type args struct { permission influxdb.Permission diff --git a/cmd/influx/task.go b/cmd/influx/task.go index 070e07310c4..ee7e08e1e5d 100644 --- a/cmd/influx/task.go +++ b/cmd/influx/task.go @@ -171,7 +171,7 @@ func taskFindF(cmd *cobra.Command, args []string) error { } filter.Limit = taskFindFlags.limit - var tasks []http.Task + var tasks []*influxdb.Task if taskFindFlags.id != "" { id, err := influxdb.IDFromString(taskFindFlags.id) @@ -184,7 +184,7 @@ func taskFindF(cmd *cobra.Command, args []string) error { return err } - tasks = append(tasks, *task) + tasks = append(tasks, task) } else { tasks, _, err = s.FindTasks(context.Background(), filter) if err != nil { @@ -322,8 +322,8 @@ func taskDeleteF(cmd *cobra.Command, args []string) error { type taskPrintOpts struct { hideHeaders bool json bool - task *http.Task - tasks []http.Task + task *influxdb.Task + tasks []*influxdb.Task } func printTasks(w io.Writer, opts taskPrintOpts) error { @@ -351,7 +351,7 @@ func printTasks(w io.Writer, opts taskPrintOpts) error { ) if opts.task != nil { - opts.tasks = append(opts.tasks, *opts.task) + opts.tasks = append(opts.tasks, opts.task) } for _, t := range opts.tasks { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 6df37580e02..b7aa2f2746a 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -31,6 +31,7 @@ import ( iqlquery "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/inmem" "github.com/influxdata/influxdb/v2/internal/fs" + "github.com/influxdata/influxdb/v2/internal/resource" "github.com/influxdata/influxdb/v2/kit/cli" "github.com/influxdata/influxdb/v2/kit/feature" overrideflagger "github.com/influxdata/influxdb/v2/kit/feature/override" @@ -45,6 +46,7 @@ import ( "github.com/influxdata/influxdb/v2/label" influxlogger "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/nats" + ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service" "github.com/influxdata/influxdb/v2/pkger" infprom "github.com/influxdata/influxdb/v2/prometheus" "github.com/influxdata/influxdb/v2/query" @@ -989,7 +991,14 @@ func (m *Launcher) run(ctx context.Context) (err error) { var notificationRuleSvc platform.NotificationRuleStore { coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor) - notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator) + notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService) + if err != nil { + return err + } + + // tasks service notification middleware which keeps task service up to date + // with persisted changes to notification rules. + notificationRuleSvc = middleware.NewNotificationRuleStore(notificationRuleSvc, m.kvService, coordinator) } // NATS streaming server @@ -1121,6 +1130,25 @@ func (m *Launcher) run(ctx context.Context) (err error) { onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, metric.WithSuffix("new")) // with metrics onboardSvc = tenant.NewOnboardingLogger(m.log.With(zap.String("handler", "onboard")), onboardSvc) // with logging + // orgIDResolver is a deprecated type which combines the lookups + // of multiple resources into one type, used to resolve the resources + // associated org ID. It is a stop-gap while we move this behaviour + // off of *kv.Service to aid in reducing the coupling on this type. + orgIDResolver := &resource.OrgIDResolver{ + AuthorizationFinder: authSvc, + BucketFinder: ts.BucketService, + OrganizationFinder: ts.OrganizationService, + DashboardFinder: dashboardSvc, + SourceFinder: sourceSvc, + TaskFinder: taskSvc, + TelegrafConfigFinder: telegrafSvc, + VariableFinder: variableSvc, + TargetFinder: scraperTargetSvc, + CheckFinder: checkSvc, + NotificationEndpointFinder: notificationEndpointStore, + NotificationRuleFinder: notificationRuleSvc, + } + m.apibackend = &http.APIBackend{ AssetsPath: m.assetsPath, HTTPErrorHandler: kithttp.ErrorHandler(0), @@ -1169,7 +1197,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { SecretService: secretSvc, LookupService: lookupSvc, DocumentService: m.kvService, - OrgLookupService: m.kvService, + OrgLookupService: orgIDResolver, WriteEventRecorder: infprom.NewEventRecorder("write"), QueryEventRecorder: infprom.NewEventRecorder("query"), Flagger: m.flagger, diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 7c06b577493..75e33a4455c 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -16,7 +16,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/lang" - platform "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/bolt" influxdbcontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/http" @@ -37,10 +37,10 @@ type TestLauncher struct { Path string // Initialized after calling the Setup() helper. - User *platform.User - Org *platform.Organization - Bucket *platform.Bucket - Auth *platform.Authorization + User *influxdb.User + Org *influxdb.Organization + Bucket *influxdb.Bucket + Auth *influxdb.Authorization httpClient *httpc.Client @@ -127,7 +127,7 @@ func (tl *TestLauncher) ShutdownOrFail(tb testing.TB, ctx context.Context) { // Setup creates a new user, bucket, org, and auth token. func (tl *TestLauncher) Setup() error { - results, err := tl.OnBoard(&platform.OnboardingRequest{ + results, err := tl.OnBoard(&influxdb.OnboardingRequest{ User: "USER", Password: "PASSWORD", Org: "ORG", @@ -153,13 +153,13 @@ func (tl *TestLauncher) SetupOrFail(tb testing.TB) { // OnBoard attempts an on-boarding request. // The on-boarding status is also reset to allow multiple user/org/buckets to be created. -func (tl *TestLauncher) OnBoard(req *platform.OnboardingRequest) (*platform.OnboardingResults, error) { +func (tl *TestLauncher) OnBoard(req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { return tl.apibackend.OnboardingService.OnboardInitialUser(context.Background(), req) } // OnBoardOrFail attempts an on-boarding request or fails on error. // The on-boarding status is also reset to allow multiple user/org/buckets to be created. -func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingRequest) *platform.OnboardingResults { +func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *influxdb.OnboardingRequest) *influxdb.OnboardingResults { tb.Helper() res, err := tl.OnBoard(req) if err != nil { @@ -169,7 +169,7 @@ func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingReq } // WriteOrFail attempts a write to the organization and bucket identified by to or fails if there is an error. -func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *platform.OnboardingResults, data string) { +func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *influxdb.OnboardingResults, data string) { tb.Helper() resp, err := nethttp.DefaultClient.Do(tl.NewHTTPRequestOrFail(tb, "POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", to.Org.ID, to.Bucket.ID), to.Auth.Token, data)) if err != nil { @@ -295,7 +295,7 @@ func (tl *TestLauncher) QueryAndNopConsume(ctx context.Context, req *query.Reque // FluxQueryOrFail performs a query to the specified organization and returns the results // or fails if there is an error. -func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organization, token string, query string) string { +func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *influxdb.Organization, token string, query string) string { tb.Helper() b, err := http.SimpleQuery(tl.URL(), query, org.Name, token) @@ -308,7 +308,7 @@ func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organizatio // QueryFlux returns the csv response from a flux query. // It also removes all the \r to make it easier to write tests. -func (tl *TestLauncher) QueryFlux(tb testing.TB, org *platform.Organization, token, query string) string { +func (tl *TestLauncher) QueryFlux(tb testing.TB, org *influxdb.Organization, token, query string) string { tb.Helper() b, err := http.SimpleQuery(tl.URL(), query, org.Name, token) @@ -367,7 +367,7 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService { return &http.BucketService{Client: tl.HTTPClient(tb)} } -func (tl *TestLauncher) CheckService() platform.CheckService { +func (tl *TestLauncher) CheckService() influxdb.CheckService { return tl.kvService } @@ -386,11 +386,12 @@ func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.Notific return http.NewNotificationEndpointService(tl.HTTPClient(tb)) } -func (tl *TestLauncher) NotificationRuleService() platform.NotificationRuleStore { - return tl.kvService +func (tl *TestLauncher) NotificationRuleService(tb testing.TB) influxdb.NotificationRuleStore { + tb.Helper() + return http.NewNotificationRuleService(tl.HTTPClient(tb)) } -func (tl *TestLauncher) OrgService(tb testing.TB) platform.OrganizationService { +func (tl *TestLauncher) OrgService(tb testing.TB) influxdb.OrganizationService { return tl.kvService } @@ -398,7 +399,7 @@ func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC { return &pkger.HTTPRemoteService{Client: tl.HTTPClient(tb)} } -func (tl *TestLauncher) TaskServiceKV() platform.TaskService { +func (tl *TestLauncher) TaskServiceKV(tb testing.TB) influxdb.TaskService { return tl.kvService } @@ -416,7 +417,7 @@ func (tl *TestLauncher) AuthorizationService(tb testing.TB) *http.AuthorizationS return &http.AuthorizationService{Client: tl.HTTPClient(tb)} } -func (tl *TestLauncher) TaskService(tb testing.TB) *http.TaskService { +func (tl *TestLauncher) TaskService(tb testing.TB) influxdb.TaskService { return &http.TaskService{Client: tl.HTTPClient(tb)} } diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 8a09788bd8e..833d6daa60d 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/http" "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/check" @@ -833,9 +832,9 @@ func TestLauncher_Pkger(t *testing.T) { deleteKillCount: 3, }), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), - pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)), pkger.WithStore(pkger.NewStoreKV(l.Launcher.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -1134,11 +1133,11 @@ func TestLauncher_Pkger(t *testing.T) { pkger.WithLabelSVC(l.LabelService(t)), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithNotificationRuleSVC(&fakeRuleStore{ - NotificationRuleStore: l.NotificationRuleService(), + NotificationRuleStore: l.NotificationRuleService(t), createKillCount: 2, }), pkger.WithStore(pkger.NewStoreKV(l.Launcher.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -2319,10 +2318,10 @@ func TestLauncher_Pkger(t *testing.T) { createKillCount: 2, // hits error on 3rd attempt at creating a mapping }), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), - pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)), pkger.WithOrganizationService(l.OrganizationService()), pkger.WithStore(pkger.NewStoreKV(l.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -2356,13 +2355,13 @@ func TestLauncher_Pkger(t *testing.T) { require.NoError(t, err) assert.Empty(t, endpoints) - rules, _, err := l.NotificationRuleService().FindNotificationRules(ctx, influxdb.NotificationRuleFilter{ + rules, _, err := l.NotificationRuleService(t).FindNotificationRules(ctx, influxdb.NotificationRuleFilter{ OrgID: &l.Org.ID, }) require.NoError(t, err) assert.Empty(t, rules) - tasks, _, err := l.TaskServiceKV().FindTasks(ctx, influxdb.TaskFilter{ + tasks, _, err := l.TaskServiceKV(t).FindTasks(ctx, influxdb.TaskFilter{ OrganizationID: &l.Org.ID, }) require.NoError(t, err) @@ -3445,10 +3444,10 @@ spec: pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), - pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)), pkger.WithOrganizationService(l.OrganizationService()), pkger.WithStore(pkger.NewStoreKV(l.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -4902,7 +4901,7 @@ func (r resourceChecker) mustDeleteLabel(t *testing.T, id influxdb.ID) { func (r resourceChecker) getRule(t *testing.T, getOpt getResourceOptFn) (influxdb.NotificationRule, error) { t.Helper() - ruleSVC := r.tl.NotificationRuleService() + ruleSVC := r.tl.NotificationRuleService(t) var ( rule influxdb.NotificationRule @@ -4944,16 +4943,16 @@ func (r resourceChecker) mustGetRule(t *testing.T, getOpt getResourceOptFn) infl func (r resourceChecker) mustDeleteRule(t *testing.T, id influxdb.ID) { t.Helper() - require.NoError(t, r.tl.NotificationRuleService().DeleteNotificationRule(ctx, id)) + require.NoError(t, r.tl.NotificationRuleService(t).DeleteNotificationRule(ctx, id)) } -func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Task, error) { +func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (influxdb.Task, error) { t.Helper() taskSVC := r.tl.TaskService(t) var ( - task *http.Task + task *influxdb.Task err error ) switch opt := getOpt(); { @@ -4963,11 +4962,11 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta OrganizationID: &r.tl.Org.ID, }) if err != nil { - return http.Task{}, err + return influxdb.Task{}, err } for _, tt := range tasks { if tt.Name == opt.name { - task = &tasks[0] + task = tasks[0] break } } @@ -4977,13 +4976,13 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta require.Fail(t, "did not provide a valid get option") } if task == nil { - return http.Task{}, errors.New("did not find expected task by name") + return influxdb.Task{}, errors.New("did not find expected task by name") } return *task, err } -func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) http.Task { +func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) influxdb.Task { t.Helper() task, err := r.getTask(t, getOpt) diff --git a/http/api_handler.go b/http/api_handler.go index 474f7eddd5a..ba8f66590d1 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -87,7 +87,7 @@ type APIBackend struct { SecretService influxdb.SecretService LookupService influxdb.LookupService ChronografService *server.Service - OrgLookupService authorizer.OrganizationService + OrgLookupService authorizer.OrgIDResolver DocumentService influxdb.DocumentService NotificationRuleStore influxdb.NotificationRuleStore NotificationEndpointService influxdb.NotificationEndpointService diff --git a/http/document_service_test.go b/http/document_service_test.go index 914b165608a..1cf96133ca2 100644 --- a/http/document_service_test.go +++ b/http/document_service_test.go @@ -65,13 +65,14 @@ func setup(t *testing.T) (func(auth influxdb.Authorizer) *httptest.Server, func( if err := ds.CreateDocument(ctx, adoc); err != nil { panic(err) } + // Organizations are needed only for creation. // Need to cleanup for comparison later. adoc.Organizations = nil backend := NewMockDocumentBackend(t) backend.HTTPErrorHandler = http.ErrorHandler(0) backend.DocumentService = authorizer.NewDocumentService(svc) - backend.LabelService = authorizer.NewLabelServiceWithOrg(svc, svc) + backend.LabelService = authorizer.NewLabelServiceWithOrg(svc, staticOrgIDResolver(org.ID)) serverFn := func(auth influxdb.Authorizer) *httptest.Server { handler := httpmock.NewAuthMiddlewareHandler(NewDocumentHandler(backend), auth) return httptest.NewServer(handler) @@ -751,3 +752,9 @@ func DeleteLabel(t *testing.T) { } }) } + +type staticOrgIDResolver influxdb.ID + +func (s staticOrgIDResolver) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) { + return (influxdb.ID)(s), nil +} diff --git a/http/notification_rule.go b/http/notification_rule.go index 1667bceb25b..44afb2d6559 100644 --- a/http/notification_rule.go +++ b/http/notification_rule.go @@ -163,21 +163,41 @@ type notificationRuleResponse struct { LastRunError string `json:"LastRunError,omitempty"` } +type ruleResponseMeta struct { + Labels []influxdb.Label `json:"labels"` + Links notificationRuleLinks `json:"links"` + Status string `json:"status"` + LatestCompleted time.Time `json:"latestCompleted,omitempty"` + LatestScheduled time.Time `json:"latestScheduled,omitempty"` + LastRunStatus string `json:"lastRunStatus,omitempty"` + LastRunError string `json:"lastRunError,omitempty"` +} + +func (resp *notificationRuleResponse) UnmarshalJSON(v []byte) (err error) { + var responseMeta ruleResponseMeta + if err = json.Unmarshal(v, &responseMeta); err != nil { + return + } + + resp.Labels = responseMeta.Labels + resp.Links = responseMeta.Links + resp.Status = responseMeta.Status + resp.LatestCompleted = responseMeta.LatestCompleted + resp.LatestScheduled = responseMeta.LatestScheduled + resp.LastRunStatus = responseMeta.LastRunStatus + resp.LastRunError = responseMeta.LastRunError + + resp.NotificationRule, err = rule.UnmarshalJSON(v) + return +} + func (resp notificationRuleResponse) MarshalJSON() ([]byte, error) { b1, err := json.Marshal(resp.NotificationRule) if err != nil { return nil, err } - b2, err := json.Marshal(struct { - Labels []influxdb.Label `json:"labels"` - Links notificationRuleLinks `json:"links"` - Status string `json:"status"` - LatestCompleted time.Time `json:"latestCompleted,omitempty"` - LatestScheduled time.Time `json:"latestScheduled,omitempty"` - LastRunStatus string `json:"lastRunStatus,omitempty"` - LastRunError string `json:"lastRunError,omitempty"` - }{ + b2, err := json.Marshal(ruleResponseMeta{ Links: resp.Links, Labels: resp.Labels, Status: resp.Status, @@ -825,6 +845,7 @@ func (s *NotificationRuleService) FindNotificationRules(ctx context.Context, fil for _, r := range resp.NotificationRules { rules = append(rules, r.rule) } + return rules, len(rules), nil } diff --git a/http/task_service.go b/http/task_service.go index 7ddfee8f023..6bdc5d5ae04 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/pkg/httpc" + "github.com/influxdata/influxdb/v2/task/options" "go.uber.org/zap" ) @@ -220,6 +221,54 @@ func NewFrontEndTask(t influxdb.Task) Task { } } +func convertTask(t Task) *influxdb.Task { + var ( + latestCompleted time.Time + createdAt time.Time + updatedAt time.Time + offset time.Duration + ) + + if t.LatestCompleted != "" { + latestCompleted, _ = time.Parse(time.RFC3339, t.LatestCompleted) + } + + if t.CreatedAt != "" { + createdAt, _ = time.Parse(time.RFC3339, t.CreatedAt) + } + + if t.UpdatedAt != "" { + updatedAt, _ = time.Parse(time.RFC3339, t.UpdatedAt) + } + + if t.Offset != "" { + var duration options.Duration + if err := duration.Parse(t.Offset); err == nil { + offset, _ = duration.DurationFrom(time.Now()) + } + } + + return &influxdb.Task{ + ID: t.ID, + OrganizationID: t.OrganizationID, + Organization: t.Organization, + OwnerID: t.OwnerID, + Name: t.Name, + Description: t.Description, + Status: t.Status, + Flux: t.Flux, + Every: t.Every, + Cron: t.Cron, + Offset: offset, + LatestCompleted: latestCompleted, + LastRunStatus: t.LastRunStatus, + LastRunError: t.LastRunError, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + Metadata: t.Metadata, + } +} + func customParseDuration(d time.Duration) string { str := "" if d < 0 { @@ -1414,7 +1463,7 @@ type TaskService struct { } // FindTaskByID returns a single task -func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, error) { +func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1424,12 +1473,12 @@ func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, e return nil, err } - return &tr.Task, nil + return convertTask(tr.Task), nil } // FindTasks returns a list of tasks that match a filter (limit 100) and the total count // of matching tasks. -func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]Task, int, error) { +func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1470,15 +1519,15 @@ func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) return nil, 0, err } - tasks := make([]Task, len(tr.Tasks)) + tasks := make([]*influxdb.Task, len(tr.Tasks)) for i := range tr.Tasks { - tasks[i] = tr.Tasks[i].Task + tasks[i] = convertTask(tr.Tasks[i].Task) } return tasks, len(tasks), nil } // CreateTask creates a new task. -func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*Task, error) { +func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() var tr taskResponse @@ -1491,11 +1540,11 @@ func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*T return nil, err } - return &tr.Task, nil + return convertTask(tr.Task), nil } // UpdateTask updates a single task with changeset. -func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*Task, error) { +func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1507,7 +1556,7 @@ func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxd return nil, err } - return &tr.Task, nil + return convertTask(tr.Task), nil } // DeleteTask removes a task by ID and purges all associated data and scheduled runs. diff --git a/internal/resource/org_id.go b/internal/resource/org_id.go new file mode 100644 index 00000000000..e05fb6f9410 --- /dev/null +++ b/internal/resource/org_id.go @@ -0,0 +1,194 @@ +package resource + +import ( + "context" + "fmt" + + "github.com/influxdata/influxdb/v2" +) + +// OrgIDResolver is a type which combines multiple resource services +// in order to resolve the resources associated org ID. +// Ideally you do not need to use this type, it is mostly a stop-gap +// while we migrate responsibilities off of *kv.Service. +// Consider it deprecated. +type OrgIDResolver struct { + AuthorizationFinder interface { + FindAuthorizationByID(context.Context, influxdb.ID) (*influxdb.Authorization, error) + } + BucketFinder interface { + FindBucketByID(context.Context, influxdb.ID) (*influxdb.Bucket, error) + } + OrganizationFinder interface { + FindOrganizationByID(context.Context, influxdb.ID) (*influxdb.Organization, error) + } + DashboardFinder interface { + FindDashboardByID(context.Context, influxdb.ID) (*influxdb.Dashboard, error) + } + SourceFinder interface { + FindSourceByID(context.Context, influxdb.ID) (*influxdb.Source, error) + } + TaskFinder interface { + FindTaskByID(context.Context, influxdb.ID) (*influxdb.Task, error) + } + TelegrafConfigFinder interface { + FindTelegrafConfigByID(context.Context, influxdb.ID) (*influxdb.TelegrafConfig, error) + } + VariableFinder interface { + FindVariableByID(context.Context, influxdb.ID) (*influxdb.Variable, error) + } + TargetFinder interface { + GetTargetByID(context.Context, influxdb.ID) (*influxdb.ScraperTarget, error) + } + CheckFinder interface { + FindCheckByID(context.Context, influxdb.ID) (influxdb.Check, error) + } + NotificationEndpointFinder interface { + FindNotificationEndpointByID(context.Context, influxdb.ID) (influxdb.NotificationEndpoint, error) + } + NotificationRuleFinder interface { + FindNotificationRuleByID(context.Context, influxdb.ID) (influxdb.NotificationRule, error) + } +} + +// FindResourceOrganizationID is used to find the organization that a resource belongs to five the id of a resource and a resource type. +func (o *OrgIDResolver) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) { + switch rt { + case influxdb.AuthorizationsResourceType: + if o.AuthorizationFinder == nil { + break + } + + r, err := o.AuthorizationFinder.FindAuthorizationByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.BucketsResourceType: + if o.BucketFinder == nil { + break + } + + r, err := o.BucketFinder.FindBucketByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.OrgsResourceType: + if o.OrganizationFinder == nil { + break + } + + r, err := o.OrganizationFinder.FindOrganizationByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.ID, nil + case influxdb.DashboardsResourceType: + if o.DashboardFinder == nil { + break + } + + r, err := o.DashboardFinder.FindDashboardByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.SourcesResourceType: + if o.SourceFinder == nil { + break + } + + r, err := o.SourceFinder.FindSourceByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.TasksResourceType: + if o.TaskFinder == nil { + break + } + + r, err := o.TaskFinder.FindTaskByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.TelegrafsResourceType: + if o.TelegrafConfigFinder == nil { + break + } + + r, err := o.TelegrafConfigFinder.FindTelegrafConfigByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.VariablesResourceType: + if o.VariableFinder == nil { + break + } + + r, err := o.VariableFinder.FindVariableByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.ScraperResourceType: + if o.TargetFinder == nil { + break + } + + r, err := o.TargetFinder.GetTargetByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.ChecksResourceType: + if o.CheckFinder == nil { + break + } + + r, err := o.CheckFinder.FindCheckByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.GetOrgID(), nil + case influxdb.NotificationEndpointResourceType: + if o.NotificationEndpointFinder == nil { + break + } + + r, err := o.NotificationEndpointFinder.FindNotificationEndpointByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.GetOrgID(), nil + case influxdb.NotificationRuleResourceType: + if o.NotificationRuleFinder == nil { + break + } + + r, err := o.NotificationRuleFinder.FindNotificationRuleByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.GetOrgID(), nil + } + + return influxdb.InvalidID(), &influxdb.Error{ + Msg: fmt.Sprintf("unsupported resource type %s", rt), + } +} diff --git a/kv/initial_migration.go b/kv/initial_migration.go index 4fa5ac78635..17661be0908 100644 --- a/kv/initial_migration.go +++ b/kv/initial_migration.go @@ -46,7 +46,7 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error { telegrafBucket, telegrafPluginsBucket, urmBucket, - notificationRuleBucket, + []byte("notificationRulev1"), userBucket, userIndex, sourceBucket, diff --git a/kv/notification_rule.go b/kv/notification_rule.go deleted file mode 100644 index fefe1b91be7..00000000000 --- a/kv/notification_rule.go +++ /dev/null @@ -1,498 +0,0 @@ -package kv - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/influxdata/influxdb/v2/notification/rule" - "go.uber.org/zap" - - "github.com/influxdata/influxdb/v2" -) - -var ( - notificationRuleBucket = []byte("notificationRulev1") - - // ErrNotificationRuleNotFound is used when the notification rule is not found. - ErrNotificationRuleNotFound = &influxdb.Error{ - Msg: "notification rule not found", - Code: influxdb.ENotFound, - } - - // ErrInvalidNotificationRuleID is used when the service was provided - // an invalid ID format. - ErrInvalidNotificationRuleID = &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "provided notification rule ID has invalid format", - } -) - -var _ influxdb.NotificationRuleStore = (*Service)(nil) - -// UnavailableNotificationRuleStoreError is used if we aren't able to interact with the -// store, it means the store is not available at the moment (e.g. network). -func UnavailableNotificationRuleStoreError(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInternal, - Msg: fmt.Sprintf("Unable to connect to notification rule store service. Please try again; Err: %v", err), - Op: "kv/notificationRule", - } -} - -// InternalNotificationRuleStoreError is used when the error comes from an -// internal system. -func InternalNotificationRuleStoreError(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInternal, - Msg: fmt.Sprintf("Unknown internal notificationRule data error; Err: %v", err), - Op: "kv/notificationRule", - } -} - -func (s *Service) notificationRuleBucket(tx Tx) (Bucket, error) { - b, err := tx.Bucket(notificationRuleBucket) - if err != nil { - return nil, UnavailableNotificationRuleStoreError(err) - } - return b, nil -} - -// CreateNotificationRule creates a new notification rule and sets b.ID with the new identifier. -func (s *Service) CreateNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { - return s.kv.Update(ctx, func(tx Tx) error { - return s.createNotificationRule(ctx, tx, nr, userID) - }) -} - -func (s *Service) createNotificationRule(ctx context.Context, tx Tx, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { - id := s.IDGenerator.ID() - nr.SetID(id) - now := s.TimeGenerator.Now() - nr.SetOwnerID(userID) - nr.SetCreatedAt(now) - nr.SetUpdatedAt(now) - - t, err := s.createNotificationTask(ctx, tx, nr) - if err != nil { - return err - } - - nr.SetTaskID(t.ID) - - if err := nr.Valid(); err != nil { - return err - } - - if err := nr.Status.Valid(); err != nil { - return err - } - - if err := s.putNotificationRule(ctx, tx, nr.NotificationRule); err != nil { - return err - } - - urm := &influxdb.UserResourceMapping{ - ResourceID: id, - UserID: userID, - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - } - return s.createUserResourceMapping(ctx, tx, urm) -} - -func (s *Service) createNotificationTask(ctx context.Context, tx Tx, r influxdb.NotificationRuleCreate) (*influxdb.Task, error) { - ep, err := s.findNotificationEndpointByID(ctx, tx, r.GetEndpointID()) - if err != nil { - return nil, err - } - - script, err := r.GenerateFlux(ep) - if err != nil { - return nil, err - } - - status := string(r.Status) - - tc := influxdb.TaskCreate{ - Type: r.Type(), - Flux: script, - OwnerID: r.GetOwnerID(), - OrganizationID: r.GetOrgID(), - Status: status, - } - - t, err := s.createTask(ctx, tx, tc) - if err != nil { - return nil, err - } - - return t, nil -} - -func (s *Service) updateNotificationTask(ctx context.Context, tx Tx, r influxdb.NotificationRule, status *string) (*influxdb.Task, error) { - ep, err := s.findNotificationEndpointByID(ctx, tx, r.GetEndpointID()) - if err != nil { - return nil, err - } - - script, err := r.GenerateFlux(ep) - if err != nil { - return nil, err - } - - tu := influxdb.TaskUpdate{ - Flux: &script, - Description: strPtr(r.GetDescription()), - Status: status, - } - - t, err := s.updateTask(ctx, tx, r.GetTaskID(), tu) - if err != nil { - return nil, err - } - - return t, nil -} - -// UpdateNotificationRule updates a single notification rule. -// Returns the new notification rule after update. -func (s *Service) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) { - var err error - var rule influxdb.NotificationRule - - err = s.kv.Update(ctx, func(tx Tx) error { - rule, err = s.updateNotificationRule(ctx, tx, id, nr, userID) - return err - }) - - return rule, err -} - -func (s *Service) updateNotificationRule(ctx context.Context, tx Tx, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) { - - current, err := s.findNotificationRuleByID(ctx, tx, id) - if err != nil { - return nil, err - } - - // ID and OrganizationID can not be updated - nr.SetID(current.GetID()) - nr.SetOrgID(current.GetOrgID()) - nr.SetOwnerID(current.GetOwnerID()) - nr.SetCreatedAt(current.GetCRUDLog().CreatedAt) - nr.SetUpdatedAt(s.TimeGenerator.Now()) - nr.SetTaskID(current.GetTaskID()) - - if err := nr.Valid(); err != nil { - return nil, err - } - - if err := nr.Status.Valid(); err != nil { - return nil, err - } - - _, err = s.updateNotificationTask(ctx, tx, nr, strPtr(string(nr.Status))) - if err != nil { - return nil, err - } - - if err := s.putNotificationRule(ctx, tx, nr.NotificationRule); err != nil { - return nil, err - } - - return nr.NotificationRule, nil -} - -// PatchNotificationRule updates a single notification rule with changeset. -// Returns the new notification rule state after update. -func (s *Service) PatchNotificationRule(ctx context.Context, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { - var nr influxdb.NotificationRule - if err := s.kv.Update(ctx, func(tx Tx) (err error) { - nr, err = s.patchNotificationRule(ctx, tx, id, upd) - if err != nil { - return err - } - return nil - }); err != nil { - return nil, err - } - - return nr, nil -} - -func (s *Service) patchNotificationRule(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { - nr, err := s.findNotificationRuleByID(ctx, tx, id) - if err != nil { - return nil, err - } - - if upd.Name != nil { - nr.SetName(*upd.Name) - } - if upd.Description != nil { - nr.SetDescription(*upd.Description) - } - var status *string - if upd.Status != nil { - status = strPtr(string(*upd.Status)) - } - - nr.SetUpdatedAt(s.TimeGenerator.Now()) - - if err := nr.Valid(); err != nil { - return nil, err - } - - _, err = s.updateNotificationTask(ctx, tx, nr, status) - if err != nil { - return nil, err - } - - err = s.putNotificationRule(ctx, tx, nr) - if err != nil { - return nil, err - } - - return nr, nil -} - -// PutNotificationRule put a notification rule to storage. -func (s *Service) PutNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate) error { - return s.kv.Update(ctx, func(tx Tx) (err error) { - if err := nr.Valid(); err != nil { - return err - } - - if err := nr.Status.Valid(); err != nil { - return err - } - - return s.putNotificationRule(ctx, tx, nr) - }) -} - -func (s *Service) putNotificationRule(ctx context.Context, tx Tx, nr influxdb.NotificationRule) error { - encodedID, _ := nr.GetID().Encode() - - v, err := json.Marshal(nr) - if err != nil { - return err - } - - bucket, err := s.notificationRuleBucket(tx) - if err != nil { - return err - } - - if err := bucket.Put(encodedID, v); err != nil { - return UnavailableNotificationRuleStoreError(err) - } - return nil -} - -// FindNotificationRuleByID returns a single notification rule by ID. -func (s *Service) FindNotificationRuleByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationRule, error) { - var ( - nr influxdb.NotificationRule - err error - ) - - err = s.kv.View(ctx, func(tx Tx) error { - nr, err = s.findNotificationRuleByID(ctx, tx, id) - return err - }) - - return nr, err -} - -func (s *Service) findNotificationRuleByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.NotificationRule, error) { - encID, err := id.Encode() - if err != nil { - return nil, ErrInvalidNotificationRuleID - } - - bucket, err := s.notificationRuleBucket(tx) - if err != nil { - return nil, err - } - - v, err := bucket.Get(encID) - if IsNotFound(err) { - return nil, ErrNotificationRuleNotFound - } - if err != nil { - return nil, InternalNotificationRuleStoreError(err) - } - - return rule.UnmarshalJSON(v) -} - -// FindNotificationRules returns a list of notification rules that match filter and the total count of matching notification rules. -// Additional options provide pagination & sorting. -func (s *Service) FindNotificationRules(ctx context.Context, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) (nrs []influxdb.NotificationRule, n int, err error) { - err = s.kv.View(ctx, func(tx Tx) error { - nrs, n, err = s.findNotificationRules(ctx, tx, filter, opt...) - return err - }) - return nrs, n, err -} - -func (s *Service) findNotificationRules(ctx context.Context, tx Tx, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationRule, int, error) { - nrs := make([]influxdb.NotificationRule, 0) - - m, err := s.findUserResourceMappings(ctx, tx, filter.UserResourceMappingFilter) - if err != nil { - return nil, 0, err - } - - if len(m) == 0 { - return nrs, 0, nil - } - - idMap := make(map[influxdb.ID]bool) - for _, item := range m { - idMap[item.ResourceID] = false - } - - if filter.OrgID != nil || filter.Organization != nil { - o, err := s.findOrganization(ctx, tx, influxdb.OrganizationFilter{ - ID: filter.OrgID, - Name: filter.Organization, - }) - - if err != nil { - return nrs, 0, err - } - filter.OrgID = &o.ID - } - - var offset, limit, count int - var descending bool - if len(opt) > 0 { - offset = opt[0].Offset - limit = opt[0].Limit - descending = opt[0].Descending - } - filterFn := filterNotificationRulesFn(idMap, filter) - err = s.forEachNotificationRule(ctx, tx, descending, func(nr influxdb.NotificationRule) bool { - if filterFn(nr) { - if count >= offset { - nrs = append(nrs, nr) - } - count++ - } - - if limit > 0 && len(nrs) >= limit { - return false - } - - return true - }) - - return nrs, len(nrs), err -} - -// forEachNotificationRule will iterate through all notification rules while fn returns true. -func (s *Service) forEachNotificationRule(ctx context.Context, tx Tx, descending bool, fn func(influxdb.NotificationRule) bool) error { - - bkt, err := s.notificationRuleBucket(tx) - if err != nil { - return err - } - - direction := CursorAscending - if descending { - direction = CursorDescending - } - - cur, err := bkt.ForwardCursor(nil, WithCursorDirection(direction)) - if err != nil { - return err - } - - for k, v := cur.Next(); k != nil; k, v = cur.Next() { - nr, err := rule.UnmarshalJSON(v) - if err != nil { - return err - } - if !fn(nr) { - break - } - } - - return nil -} - -func filterNotificationRulesFn(idMap map[influxdb.ID]bool, filter influxdb.NotificationRuleFilter) func(nr influxdb.NotificationRule) bool { - if filter.OrgID != nil { - return func(nr influxdb.NotificationRule) bool { - if !nr.MatchesTags(filter.Tags) { - return false - } - - _, ok := idMap[nr.GetID()] - return nr.GetOrgID() == *filter.OrgID && ok - } - } - - return func(nr influxdb.NotificationRule) bool { - if !nr.MatchesTags(filter.Tags) { - return false - } - - _, ok := idMap[nr.GetID()] - return ok - } -} - -// DeleteNotificationRule removes a notification rule by ID. -func (s *Service) DeleteNotificationRule(ctx context.Context, id influxdb.ID) error { - return s.kv.Update(ctx, func(tx Tx) error { - return s.deleteNotificationRule(ctx, tx, id) - }) -} - -func (s *Service) deleteNotificationRule(ctx context.Context, tx Tx, id influxdb.ID) error { - r, err := s.findNotificationRuleByID(ctx, tx, id) - if err != nil { - return err - } - - if err := s.deleteTask(ctx, tx, r.GetTaskID()); err != nil { - return err - } - - encodedID, err := id.Encode() - if err != nil { - return ErrInvalidNotificationRuleID - } - - bucket, err := s.notificationRuleBucket(tx) - if err != nil { - return err - } - - _, err = bucket.Get(encodedID) - if IsNotFound(err) { - return ErrNotificationRuleNotFound - } - if err != nil { - return InternalNotificationRuleStoreError(err) - } - - if err := bucket.Delete(encodedID); err != nil { - return InternalNotificationRuleStoreError(err) - } - - if err := s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{ - ResourceID: id, - ResourceType: influxdb.NotificationRuleResourceType, - }); err != nil { - // TODO(desa): it is possible that there were no user resource mappings for a resource so this likely shouldn't be a blocking - // condition for deleting a notification rule. - s.log.Info("Failed to remove user resource mappings for notification rule", zap.Error(err), zap.Stringer("rule_id", id)) - } - - return nil -} diff --git a/kv/notification_rule_test.go b/kv/notification_rule_test.go deleted file mode 100644 index fdb054ef83e..00000000000 --- a/kv/notification_rule_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package kv_test - -import ( - "context" - "testing" - - "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kv" - "github.com/influxdata/influxdb/v2/query/fluxlang" - influxdbtesting "github.com/influxdata/influxdb/v2/testing" - "go.uber.org/zap/zaptest" -) - -func TestBoltNotificationRuleStore(t *testing.T) { - influxdbtesting.NotificationRuleStore(initBoltNotificationRuleStore, t) -} - -func initBoltNotificationRuleStore(f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) { - s, closeBolt, err := NewTestBoltStore(t) - if err != nil { - t.Fatalf("failed to create new kv store: %v", err) - } - - svc, closeSvc := initNotificationRuleStore(s, f, t) - return svc, func() { - closeSvc() - closeBolt() - } -} - -func initNotificationRuleStore(s kv.SchemaStore, f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) { - ctx := context.Background() - svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{ - FluxLanguageService: fluxlang.DefaultService, - }) - svc.IDGenerator = f.IDGenerator - svc.TimeGenerator = f.TimeGenerator - if f.TimeGenerator == nil { - svc.TimeGenerator = influxdb.RealTimeGenerator{} - } - - for _, o := range f.Orgs { - if err := svc.PutOrganization(ctx, o); err != nil { - t.Fatalf("failed to populate org: %v", err) - } - } - - for _, m := range f.UserResourceMappings { - if err := svc.CreateUserResourceMapping(ctx, m); err != nil { - t.Fatalf("failed to populate user resource mapping: %v", err) - } - } - - for _, e := range f.Endpoints { - if err := svc.CreateNotificationEndpoint(ctx, e, 1); err != nil { - t.Fatalf("failed to populate notification endpoint: %v", err) - } - } - - for _, nr := range f.NotificationRules { - nrc := influxdb.NotificationRuleCreate{ - NotificationRule: nr, - Status: influxdb.Active, - } - if err := svc.PutNotificationRule(ctx, nrc); err != nil { - t.Fatalf("failed to populate notification rule: %v", err) - } - } - - for _, c := range f.Tasks { - if _, err := svc.CreateTask(ctx, c); err != nil { - t.Fatalf("failed to populate task: %v", err) - } - } - - return svc, func() { - for _, nr := range f.NotificationRules { - if err := svc.DeleteNotificationRule(ctx, nr.GetID()); err != nil { - t.Logf("failed to remove notification rule: %v", err) - } - } - for _, urm := range f.UserResourceMappings { - if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound { - t.Logf("failed to remove urm rule: %v", err) - } - } - for _, o := range f.Orgs { - if err := svc.DeleteOrganization(ctx, o.ID); err != nil { - t.Fatalf("failed to remove org: %v", err) - } - } - } -} diff --git a/kv/org.go b/kv/org.go index 0af68c7f6d7..ef6720e5de5 100644 --- a/kv/org.go +++ b/kv/org.go @@ -717,88 +717,6 @@ func (s *Service) appendOrganizationEventToLog(ctx context.Context, tx Tx, id in return s.addLogEntry(ctx, tx, k, v, s.Now()) } -// FindResourceOrganizationID is used to find the organization that a resource belongs to five the id of a resource and a resource type. -func (s *Service) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) { - switch rt { - case influxdb.AuthorizationsResourceType: - r, err := s.FindAuthorizationByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.BucketsResourceType: - r, err := s.FindBucketByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.OrgsResourceType: - r, err := s.FindOrganizationByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.ID, nil - case influxdb.DashboardsResourceType: - r, err := s.FindDashboardByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.SourcesResourceType: - r, err := s.FindSourceByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.TasksResourceType: - r, err := s.FindTaskByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.TelegrafsResourceType: - r, err := s.FindTelegrafConfigByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.VariablesResourceType: - r, err := s.FindVariableByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.ScraperResourceType: - r, err := s.GetTargetByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.ChecksResourceType: - r, err := s.FindCheckByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.GetOrgID(), nil - case influxdb.NotificationEndpointResourceType: - r, err := s.FindNotificationEndpointByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.GetOrgID(), nil - case influxdb.NotificationRuleResourceType: - r, err := s.FindNotificationRuleByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.GetOrgID(), nil - } - - return influxdb.InvalidID(), &influxdb.Error{ - Msg: fmt.Sprintf("unsupported resource type %s", rt), - } -} - // OrgAlreadyExistsError is used when creating a new organization with // a name that has already been used. Organization names must be unique. func OrgAlreadyExistsError(o *influxdb.Organization) error { diff --git a/label/middleware_auth.go b/label/middleware_auth.go index 2c117aaa035..bbb68b3fedf 100644 --- a/label/middleware_auth.go +++ b/label/middleware_auth.go @@ -2,7 +2,6 @@ package label import ( "context" - "errors" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/authorizer" @@ -11,15 +10,15 @@ import ( var _ influxdb.LabelService = (*AuthedLabelService)(nil) type AuthedLabelService struct { - s influxdb.LabelService - orgSvc authorizer.OrganizationService + s influxdb.LabelService + orgIDResolver authorizer.OrgIDResolver } // NewAuthedLabelService constructs an instance of an authorizing label serivce. -func NewAuthedLabelService(s influxdb.LabelService, orgSvc authorizer.OrganizationService) *AuthedLabelService { +func NewAuthedLabelService(s influxdb.LabelService, orgIDResolver authorizer.OrgIDResolver) *AuthedLabelService { return &AuthedLabelService{ - s: s, - orgSvc: orgSvc, + s: s, + orgIDResolver: orgIDResolver, } } func (s *AuthedLabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error { @@ -59,10 +58,7 @@ func (s *AuthedLabelService) FindResourceLabels(ctx context.Context, filter infl return nil, err } - if s.orgSvc == nil { - return nil, errors.New("failed to find orgSvc") - } - orgID, err := s.orgSvc.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) if err != nil { return nil, err } diff --git a/notification/rule/service/service.go b/notification/rule/service/service.go new file mode 100644 index 00000000000..de4aa40d833 --- /dev/null +++ b/notification/rule/service/service.go @@ -0,0 +1,497 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/notification/rule" + "github.com/influxdata/influxdb/v2/pkg/pointer" + "github.com/influxdata/influxdb/v2/snowflake" + "go.uber.org/zap" +) + +var ( + notificationRuleBucket = []byte("notificationRulev1") + + // ErrNotificationRuleNotFound is used when the notification rule is not found. + ErrNotificationRuleNotFound = &influxdb.Error{ + Msg: "notification rule not found", + Code: influxdb.ENotFound, + } + + // ErrInvalidNotificationRuleID is used when the service was provided + // an invalid ID format. + ErrInvalidNotificationRuleID = &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "provided notification rule ID has invalid format", + } +) + +// RuleService is an implementation of the influxdb CheckService +// It is backed by the kv store abstraction. +type RuleService struct { + log *zap.Logger + + kv kv.Store + tasks influxdb.TaskService + orgs influxdb.OrganizationService + endpoints influxdb.NotificationEndpointService + + idGenerator influxdb.IDGenerator + timeGenerator influxdb.TimeGenerator +} + +// NewRuleService constructs and configures a notification rule service +func NewRuleService(logger *zap.Logger, store kv.Store, tasks influxdb.TaskService, orgs influxdb.OrganizationService, endpoints influxdb.NotificationEndpointService) (*RuleService, error) { + s := &RuleService{ + log: logger, + kv: store, + tasks: tasks, + orgs: orgs, + endpoints: endpoints, + timeGenerator: influxdb.RealTimeGenerator{}, + idGenerator: snowflake.NewIDGenerator(), + } + + ctx := context.Background() + if err := store.Update(ctx, func(tx kv.Tx) error { + return s.initializeNotificationRule(ctx, tx) + }); err != nil { + return nil, err + } + + return s, nil +} + +var _ influxdb.NotificationRuleStore = (*RuleService)(nil) + +func (s *RuleService) initializeNotificationRule(ctx context.Context, tx kv.Tx) error { + if _, err := s.notificationRuleBucket(tx); err != nil { + return err + } + return nil +} + +// UnavailableNotificationRuleStoreError is used if we aren't able to interact with the +// store, it means the store is not available at the moment (e.g. network). +func UnavailableNotificationRuleStoreError(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unable to connect to notification rule store service. Please try again; Err: %v", err), + Op: "kv/notificationRule", + } +} + +// InternalNotificationRuleStoreError is used when the error comes from an +// internal system. +func InternalNotificationRuleStoreError(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unknown internal notificationRule data error; Err: %v", err), + Op: "kv/notificationRule", + } +} + +func (s *RuleService) notificationRuleBucket(tx kv.Tx) (kv.Bucket, error) { + b, err := tx.Bucket(notificationRuleBucket) + if err != nil { + return nil, UnavailableNotificationRuleStoreError(err) + } + return b, nil +} + +// CreateNotificationRule creates a new notification rule and sets b.ID with the new identifier. +func (s *RuleService) CreateNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { + // set notification rule ID + id := s.idGenerator.ID() + nr.SetID(id) + + // set notification rule created / updated times + now := s.timeGenerator.Now() + nr.SetOwnerID(userID) + nr.SetCreatedAt(now) + nr.SetUpdatedAt(now) + + // create backing task and set ID (in inactive state initially) + t, err := s.createNotificationTask(ctx, nr) + if err != nil { + return err + } + + nr.SetTaskID(t.ID) + + if err := s.kv.Update(ctx, func(tx kv.Tx) error { + return s.createNotificationRule(ctx, tx, nr, userID) + }); err != nil { + // remove associated task + if derr := s.tasks.DeleteTask(ctx, t.ID); derr != nil { + s.log.Error("failed to remove task for invalid notification rule", zap.Error(derr)) + } + + return err + } + + // set task to notification rule create status + _, err = s.tasks.UpdateTask(ctx, t.ID, influxdb.TaskUpdate{Status: pointer.String(string(nr.Status))}) + return err +} + +func (s *RuleService) createNotificationRule(ctx context.Context, tx kv.Tx, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { + if err := nr.Valid(); err != nil { + return err + } + + if err := nr.Status.Valid(); err != nil { + return err + } + + return s.putNotificationRule(ctx, tx, nr.NotificationRule) +} + +func (s *RuleService) createNotificationTask(ctx context.Context, r influxdb.NotificationRuleCreate) (*influxdb.Task, error) { + ep, err := s.endpoints.FindNotificationEndpointByID(ctx, r.GetEndpointID()) + if err != nil { + return nil, err + } + + script, err := r.GenerateFlux(ep) + if err != nil { + return nil, err + } + + tc := influxdb.TaskCreate{ + Type: r.Type(), + Flux: script, + OwnerID: r.GetOwnerID(), + OrganizationID: r.GetOrgID(), + // create task initially in inactive status + Status: string(influxdb.Inactive), + } + + t, err := s.tasks.CreateTask(ctx, tc) + if err != nil { + return nil, err + } + + return t, nil +} + +// UpdateNotificationRule updates a single notification rule. +// Returns the new notification rule after update. +func (s *RuleService) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) { + rule, err := s.FindNotificationRuleByID(ctx, id) + if err != nil { + return nil, err + } + + // ID and OrganizationID can not be updated + nr.SetID(rule.GetID()) + nr.SetOrgID(rule.GetOrgID()) + nr.SetOwnerID(rule.GetOwnerID()) + nr.SetCreatedAt(rule.GetCRUDLog().CreatedAt) + nr.SetUpdatedAt(s.timeGenerator.Now()) + nr.SetTaskID(rule.GetTaskID()) + + if err := nr.Valid(); err != nil { + return nil, err + } + + if err := nr.Status.Valid(); err != nil { + return nil, err + } + + _, err = s.updateNotificationTask(ctx, nr, pointer.String(string(nr.Status))) + if err != nil { + return nil, err + } + + err = s.kv.Update(ctx, func(tx kv.Tx) error { + return s.putNotificationRule(ctx, tx, nr.NotificationRule) + }) + + return nr.NotificationRule, err +} + +func (s *RuleService) updateNotificationTask(ctx context.Context, r influxdb.NotificationRule, status *string) (*influxdb.Task, error) { + ep, err := s.endpoints.FindNotificationEndpointByID(ctx, r.GetEndpointID()) + if err != nil { + return nil, err + } + + script, err := r.GenerateFlux(ep) + if err != nil { + return nil, err + } + + tu := influxdb.TaskUpdate{ + Flux: &script, + Description: pointer.String(r.GetDescription()), + Status: status, + } + + t, err := s.tasks.UpdateTask(ctx, r.GetTaskID(), tu) + if err != nil { + return nil, err + } + + return t, nil +} + +// PatchNotificationRule updates a single notification rule with changeset. +// Returns the new notification rule state after update. +func (s *RuleService) PatchNotificationRule(ctx context.Context, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { + nr, err := s.FindNotificationRuleByID(ctx, id) + if err != nil { + return nil, err + } + + if upd.Name != nil { + nr.SetName(*upd.Name) + } + if upd.Description != nil { + nr.SetDescription(*upd.Description) + } + + var status *string + if upd.Status != nil { + status = pointer.String(string(*upd.Status)) + } + + nr.SetUpdatedAt(s.timeGenerator.Now()) + if err := nr.Valid(); err != nil { + return nil, err + } + + _, err = s.updateNotificationTask(ctx, nr, status) + if err != nil { + return nil, err + } + + if err := s.kv.Update(ctx, func(tx kv.Tx) (err error) { + return s.putNotificationRule(ctx, tx, nr) + }); err != nil { + return nil, err + } + + return nr, nil +} + +// PutNotificationRule put a notification rule to storage. +func (s *RuleService) PutNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate) error { + return s.kv.Update(ctx, func(tx kv.Tx) (err error) { + if err := nr.Valid(); err != nil { + return err + } + + if err := nr.Status.Valid(); err != nil { + return err + } + + return s.putNotificationRule(ctx, tx, nr) + }) +} + +func (s *RuleService) putNotificationRule(ctx context.Context, tx kv.Tx, nr influxdb.NotificationRule) error { + encodedID, _ := nr.GetID().Encode() + + v, err := json.Marshal(nr) + if err != nil { + return err + } + + bucket, err := s.notificationRuleBucket(tx) + if err != nil { + return err + } + + if err := bucket.Put(encodedID, v); err != nil { + return UnavailableNotificationRuleStoreError(err) + } + return nil +} + +// FindNotificationRuleByID returns a single notification rule by ID. +func (s *RuleService) FindNotificationRuleByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationRule, error) { + var ( + nr influxdb.NotificationRule + err error + ) + + err = s.kv.View(ctx, func(tx kv.Tx) error { + nr, err = s.findNotificationRuleByID(ctx, tx, id) + return err + }) + + return nr, err +} + +func (s *RuleService) findNotificationRuleByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (influxdb.NotificationRule, error) { + encID, err := id.Encode() + if err != nil { + return nil, ErrInvalidNotificationRuleID + } + + bucket, err := s.notificationRuleBucket(tx) + if err != nil { + return nil, err + } + + v, err := bucket.Get(encID) + if kv.IsNotFound(err) { + return nil, ErrNotificationRuleNotFound + } + if err != nil { + return nil, InternalNotificationRuleStoreError(err) + } + + return rule.UnmarshalJSON(v) +} + +// FindNotificationRules returns a list of notification rules that match filter and the total count of matching notification rules. +// Additional options provide pagination & sorting. +func (s *RuleService) FindNotificationRules(ctx context.Context, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) (nrs []influxdb.NotificationRule, n int, err error) { + if filter.OrgID == nil && filter.Organization != nil { + o, err := s.orgs.FindOrganization(ctx, influxdb.OrganizationFilter{ + Name: filter.Organization, + }) + + if err != nil { + return nrs, 0, err + } + + filter.OrgID = &o.ID + } + + err = s.kv.View(ctx, func(tx kv.Tx) error { + nrs, n, err = s.findNotificationRules(ctx, tx, filter, opt...) + return err + }) + + return nrs, n, err +} + +func (s *RuleService) findNotificationRules(ctx context.Context, tx kv.Tx, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationRule, int, error) { + var ( + nrs = make([]influxdb.NotificationRule, 0) + offset int + limit int + count int + descending bool + ) + + if len(opt) > 0 { + offset = opt[0].Offset + limit = opt[0].Limit + descending = opt[0].Descending + } + + filterFn := filterNotificationRulesFn(filter) + err := s.forEachNotificationRule(ctx, tx, descending, func(nr influxdb.NotificationRule) bool { + if filterFn(nr) { + if count >= offset { + nrs = append(nrs, nr) + } + count++ + } + + if limit > 0 && len(nrs) >= limit { + return false + } + + return true + }) + + return nrs, len(nrs), err +} + +// forEachNotificationRule will iterate through all notification rules while fn returns true. +func (s *RuleService) forEachNotificationRule(ctx context.Context, tx kv.Tx, descending bool, fn func(influxdb.NotificationRule) bool) error { + + bkt, err := s.notificationRuleBucket(tx) + if err != nil { + return err + } + + direction := kv.CursorAscending + if descending { + direction = kv.CursorDescending + } + + cur, err := bkt.ForwardCursor(nil, kv.WithCursorDirection(direction)) + if err != nil { + return err + } + + for k, v := cur.Next(); k != nil; k, v = cur.Next() { + nr, err := rule.UnmarshalJSON(v) + if err != nil { + return err + } + if !fn(nr) { + break + } + } + + return nil +} + +func filterNotificationRulesFn(filter influxdb.NotificationRuleFilter) func(nr influxdb.NotificationRule) bool { + if filter.OrgID != nil { + return func(nr influxdb.NotificationRule) bool { + if !nr.MatchesTags(filter.Tags) { + return false + } + + return nr.GetOrgID() == *filter.OrgID + } + } + + return func(nr influxdb.NotificationRule) bool { + return nr.MatchesTags(filter.Tags) + } +} + +// DeleteNotificationRule removes a notification rule by ID. +func (s *RuleService) DeleteNotificationRule(ctx context.Context, id influxdb.ID) error { + r, err := s.FindNotificationRuleByID(ctx, id) + if err != nil { + return err + } + + if err := s.tasks.DeleteTask(ctx, r.GetTaskID()); err != nil { + return err + } + + return s.kv.Update(ctx, func(tx kv.Tx) error { + return s.deleteNotificationRule(ctx, tx, r) + }) +} + +func (s *RuleService) deleteNotificationRule(ctx context.Context, tx kv.Tx, r influxdb.NotificationRule) error { + encodedID, err := r.GetID().Encode() + if err != nil { + return ErrInvalidNotificationRuleID + } + + bucket, err := s.notificationRuleBucket(tx) + if err != nil { + return err + } + + _, err = bucket.Get(encodedID) + if kv.IsNotFound(err) { + return ErrNotificationRuleNotFound + } + if err != nil { + return InternalNotificationRuleStoreError(err) + } + + if err := bucket.Delete(encodedID); err != nil { + return InternalNotificationRuleStoreError(err) + } + + return nil +} diff --git a/testing/notification_rule.go b/notification/rule/service/service_external_test.go similarity index 66% rename from testing/notification_rule.go rename to notification/rule/service/service_external_test.go index c4db726efae..97f0c1d050c 100644 --- a/testing/notification_rule.go +++ b/notification/rule/service/service_external_test.go @@ -1,28 +1,49 @@ -package testing +package service import ( + "bytes" "context" + "reflect" "sort" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/influxdata/influxdb/v2" + "github.com/influxdata/flux/ast" + influxdb "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/endpoint" "github.com/influxdata/influxdb/v2/notification/rule" + "github.com/influxdata/influxdb/v2/pkg/pointer" +) + +const ( + oneID = "020f755c3c082000" + twoID = "020f755c3c082001" + threeID = "020f755c3c082002" + fourID = "020f755c3c082003" + fiveID = "020f755c3c082004" + sixID = "020f755c3c082005" +) + +var ( + fakeDate = time.Date(2006, 5, 4, 1, 2, 3, 0, time.UTC) + fakeGenerator = mock.TimeGenerator{FakeValue: fakeDate} + timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)} + timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)} + time3 = time.Date(2006, time.July, 15, 5, 23, 53, 10, time.UTC) ) // NotificationRuleFields includes prepopulated data for mapping tests. type NotificationRuleFields struct { - IDGenerator influxdb.IDGenerator - TimeGenerator influxdb.TimeGenerator - NotificationRules []influxdb.NotificationRule - Orgs []*influxdb.Organization - UserResourceMappings []*influxdb.UserResourceMapping - Tasks []influxdb.TaskCreate - Endpoints []influxdb.NotificationEndpoint + IDGenerator influxdb.IDGenerator + TimeGenerator influxdb.TimeGenerator + NotificationRules []influxdb.NotificationRule + Orgs []*influxdb.Organization + Tasks []influxdb.TaskCreate + Endpoints []influxdb.NotificationEndpoint } var notificationRuleCmpOptions = cmp.Options{ @@ -36,14 +57,36 @@ var notificationRuleCmpOptions = cmp.Options{ }), } +var taskCmpOptions = cmp.Options{ + cmp.Comparer(func(x, y []byte) bool { + return bytes.Equal(x, y) + }), + // skip comparing permissions + cmpopts.IgnoreFields( + influxdb.Task{}, + "LatestCompleted", + "LatestScheduled", + "CreatedAt", + "UpdatedAt", + ), + cmp.Transformer("Sort", func(in []*influxdb.Task) []*influxdb.Task { + out := append([]*influxdb.Task{}, in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID > out[j].ID + }) + return out + }), +} + +type notificationRuleFactory func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) + // NotificationRuleStore tests all the service functions. func NotificationRuleStore( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), t *testing.T, + init notificationRuleFactory, t *testing.T, ) { tests := []struct { name string - fn func(init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), - t *testing.T) + fn func(notificationRuleFactory, *testing.T) }{ { name: "CreateNotificationRule", @@ -72,8 +115,6 @@ func NotificationRuleStore( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt := tt - t.Parallel() tt.fn(init, t) }) } @@ -81,7 +122,7 @@ func NotificationRuleStore( // CreateNotificationRule testing. func CreateNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -89,9 +130,9 @@ func CreateNotificationRule( userID influxdb.ID } type wants struct { - err error - notificationRules []influxdb.NotificationRule - userResourceMapping []*influxdb.UserResourceMapping + err error + notificationRule influxdb.NotificationRule + task *influxdb.Task } tests := []struct { @@ -117,7 +158,7 @@ func CreateNotificationRule( Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -167,14 +208,6 @@ func CreateNotificationRule( MessageTemplate: "msg1", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - }, }, args: args{ userID: MustIDBase16(sixID), @@ -213,7 +246,84 @@ func CreateNotificationRule( }, }, wants: wants{ - notificationRules: []influxdb.NotificationRule{ + notificationRule: &rule.Slack{ + Base: rule.Base{ + ID: MustIDBase16(twoID), + Name: "name2", + OwnerID: MustIDBase16(sixID), + OrgID: MustIDBase16(fourID), + EndpointID: MustIDBase16(twoID), + RunbookLink: "runbooklink1", + SleepUntil: &time3, + Every: mustDuration("1h"), + StatusRules: []notification.StatusRule{ + { + CurrentLevel: notification.Critical, + }, + }, + TagRules: []notification.TagRule{ + { + Tag: influxdb.Tag{ + Key: "k1", + Value: "v1", + }, + Operator: influxdb.NotEqual, + }, + { + Tag: influxdb.Tag{ + Key: "k2", + Value: "v2", + }, + Operator: influxdb.RegexEqual, + }, + }, + CRUDLog: influxdb.CRUDLog{ + CreatedAt: fakeDate, + UpdatedAt: fakeDate, + }, + }, + MessageTemplate: "msg1", + }, + task: &influxdb.Task{ + ID: MustIDBase16("020f755c3c082001"), + Type: "slack", + OrganizationID: MustIDBase16("020f755c3c082003"), + Organization: "org", + OwnerID: MustIDBase16("020f755c3c082005"), + Name: "name2", + Status: "active", + Flux: "package main\n// name2\nimport \"influxdata/influxdb/monitor\"\nimport \"slack\"\nimport \"influxdata/influxdb/secrets\"\nimport \"experimental\"\n\noption task = {name: \"name2\", every: 1h}\n\nslack_secret = secrets[\"get\"](key: \"020f755c3c082001-token\")\nslack_endpoint = slack[\"endpoint\"](token: slack_secret, url: \"http://localhost:7777\")\nnotification = {\n\t_notification_rule_id: \"020f755c3c082001\",\n\t_notification_rule_name: \"name2\",\n\t_notification_endpoint_id: \"020f755c3c082001\",\n\t_notification_endpoint_name: \"foo\",\n}\nstatuses = monitor[\"from\"](start: -2h, fn: (r) =>\n\t(r[\"k1\"] == \"v1\" and r[\"k2\"] == \"v2\"))\ncrit = statuses\n\t|> filter(fn: (r) =>\n\t\t(r[\"_level\"] == \"crit\"))\nall_statuses = crit\n\t|> filter(fn: (r) =>\n\t\t(r[\"_time\"] >= experimental[\"subDuration\"](from: now(), d: 1h)))\n\nall_statuses\n\t|> monitor[\"notify\"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>\n\t\t({channel: \"\", text: \"msg1\", color: if r[\"_level\"] == \"crit\" then \"danger\" else if r[\"_level\"] == \"warn\" then \"warning\" else \"good\"})))", + Every: "1h", + }, + }, + }, + { + name: "invalid tag rule value", + fields: NotificationRuleFields{ + IDGenerator: mock.NewIDGenerator(twoID, t), + TimeGenerator: fakeGenerator, + Orgs: []*influxdb.Organization{ + { + Name: "org", + ID: MustIDBase16(fourID), + }, + }, + Endpoints: []influxdb.NotificationEndpoint{ + &endpoint.Slack{ + URL: "http://localhost:7777", + Token: influxdb.SecretField{ + // TODO(desa): not sure why this has to end in token, but it does + Key: "020f755c3c082001-token", + Value: pointer.String("abc123"), + }, + Base: endpoint.Base{ + OrgID: MustIDBase16Ptr(fourID), + Name: "foo", + Status: influxdb.Active, + }, + }, + }, + NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ ID: MustIDBase16(oneID), @@ -253,58 +363,49 @@ func CreateNotificationRule( Channel: "channel1", MessageTemplate: "msg1", }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(twoID), - Name: "name2", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink1", - SleepUntil: &time3, - Every: mustDuration("1h"), - StatusRules: []notification.StatusRule{ - { - CurrentLevel: notification.Critical, - }, + }, + }, + args: args{ + userID: MustIDBase16(sixID), + notificationRule: &rule.Slack{ + Base: rule.Base{ + OwnerID: MustIDBase16(sixID), + OrgID: MustIDBase16(fourID), + Name: "name2", + EndpointID: MustIDBase16(twoID), + RunbookLink: "runbooklink1", + SleepUntil: &time3, + Every: mustDuration("1h"), + StatusRules: []notification.StatusRule{ + { + CurrentLevel: notification.Critical, }, - TagRules: []notification.TagRule{ - { - Tag: influxdb.Tag{ - Key: "k1", - Value: "v1", - }, - Operator: influxdb.NotEqual, - }, - { - Tag: influxdb.Tag{ - Key: "k2", - Value: "v2", - }, - Operator: influxdb.RegexEqual, + }, + TagRules: []notification.TagRule{ + { + Tag: influxdb.Tag{ + Key: "k1", + Value: "v1", }, + Operator: influxdb.NotEqual, }, - CRUDLog: influxdb.CRUDLog{ - CreatedAt: fakeDate, - UpdatedAt: fakeDate, + { + Tag: influxdb.Tag{ + Key: "k2", + // empty tag value to trigger validation error + Value: "", + }, + Operator: influxdb.RegexEqual, }, }, - MessageTemplate: "msg1", }, + MessageTemplate: "msg1", }, - userResourceMapping: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, + }, + wants: wants{ + err: &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "tag must contain a key and a value", }, }, }, @@ -312,7 +413,7 @@ func CreateNotificationRule( for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, tasks, done := init(tt.fields, t) defer done() ctx := context.Background() nrc := influxdb.NotificationRuleCreate{ @@ -320,33 +421,54 @@ func CreateNotificationRule( Status: influxdb.Active, } err := s.CreateNotificationRule(ctx, nrc, tt.args.userID) - if (err != nil) != (tt.wants.err != nil) { - t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err) - } - if tt.wants.err == nil && !tt.args.notificationRule.GetID().Valid() { - t.Fatalf("notification rule ID not set from CreateNotificationRule") - } + if tt.wants.err != nil { + // expected error case + if !reflect.DeepEqual(tt.wants.err, err) { + t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err) + } - if err != nil && tt.wants.err != nil { - if influxdb.ErrorCode(err) != influxdb.ErrorCode(tt.wants.err) { - t.Fatalf("expected error messages to match '%v' got '%v'", influxdb.ErrorCode(tt.wants.err), influxdb.ErrorCode(err)) + // ensure no rules can be located + _, n, err := s.FindNotificationRules(ctx, influxdb.NotificationRuleFilter{}) + if err != nil { + t.Fatal(err) } - } - urmFilter := influxdb.UserResourceMappingFilter{ - UserID: tt.args.userID, - ResourceType: influxdb.NotificationRuleResourceType, + if existing := len(tt.fields.NotificationRules); n > existing { + t.Errorf("expected no rules to be created, found %d", n-existing) + } + } else { + nr, err := s.FindNotificationRuleByID(ctx, tt.args.notificationRule.GetID()) + if err != nil { + t.Errorf("failed to retrieve notification rules: %v", err) + } + + if diff := cmp.Diff(nr, tt.wants.notificationRule, notificationRuleCmpOptions...); diff != "" { + t.Errorf("notificationRules are different -got/+want\ndiff %s", diff) + } } - filter := influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: urmFilter, + if tt.wants.task == nil || !tt.wants.task.ID.Valid() { + // if not tasks or a task with an invalid ID is provided (0) then assume + // no tasks should be persisted + _, n, err := tasks.FindTasks(ctx, influxdb.TaskFilter{}) + if err != nil { + t.Fatal(err) + } + + if n > 0 { + t.Errorf("expected zero tasks to be created, instead found %d", n) + } + + return } - nrs, _, err := s.FindNotificationRules(ctx, filter) + + task, err := tasks.FindTaskByID(ctx, tt.wants.task.ID) if err != nil { - t.Fatalf("failed to retrieve notification rules: %v", err) + t.Fatal(err) } - if diff := cmp.Diff(nrs, tt.wants.notificationRules, notificationRuleCmpOptions...); diff != "" { - t.Errorf("notificationRules are different -got/+want\ndiff %s", diff) + + if diff := cmp.Diff(task, tt.wants.task, taskCmpOptions...); diff != "" { + t.Errorf("task is different -got/+want\ndiff %s", diff) } }) } @@ -354,7 +476,7 @@ func CreateNotificationRule( // FindNotificationRuleByID testing. func FindNotificationRuleByID( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -374,20 +496,6 @@ func FindNotificationRuleByID( { name: "bad id", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -439,20 +547,6 @@ func FindNotificationRuleByID( { name: "not found", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -504,20 +598,6 @@ func FindNotificationRuleByID( { name: "basic find telegraf config by id", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -582,7 +662,7 @@ func FindNotificationRuleByID( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -597,7 +677,7 @@ func FindNotificationRuleByID( // FindNotificationRules testing func FindNotificationRules( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -618,15 +698,10 @@ func FindNotificationRules( { name: "find nothing (empty set)", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{}, - NotificationRules: []influxdb.NotificationRule{}, + NotificationRules: []influxdb.NotificationRule{}, }, args: args{ - filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, + filter: influxdb.NotificationRuleFilter{}, }, wants: wants{ notificationRules: []influxdb.NotificationRule{}, @@ -635,20 +710,6 @@ func FindNotificationRules( { name: "find all notification rules", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -688,12 +749,7 @@ func FindNotificationRules( }, }, args: args{ - filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(sixID), - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, + filter: influxdb.NotificationRuleFilter{}, }, wants: wants{ notificationRules: []influxdb.NotificationRule{ @@ -735,93 +791,6 @@ func FindNotificationRules( }, }, }, - { - name: "find owners only", - fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, - NotificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(oneID), - Name: "name1", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink1", - SleepUntil: &time3, - Every: mustDuration("1h"), - CRUDLog: influxdb.CRUDLog{ - CreatedAt: timeGen1.Now(), - UpdatedAt: timeGen2.Now(), - }, - }, - Channel: "channel1", - MessageTemplate: "msg1", - }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(twoID), - Name: "name2", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink2", - SleepUntil: &time3, - Every: mustDuration("1h"), - CRUDLog: influxdb.CRUDLog{ - CreatedAt: timeGen1.Now(), - UpdatedAt: timeGen2.Now(), - }, - }, - MessageTemplate: "msg", - }, - }, - }, - args: args{ - filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(sixID), - ResourceType: influxdb.NotificationRuleResourceType, - UserType: influxdb.Owner, - }, - }, - }, - wants: wants{ - notificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(oneID), - Name: "name1", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink1", - SleepUntil: &time3, - Every: mustDuration("1h"), - CRUDLog: influxdb.CRUDLog{ - CreatedAt: timeGen1.Now(), - UpdatedAt: timeGen2.Now(), - }, - }, - Channel: "channel1", - MessageTemplate: "msg1", - }, - }, - }, - }, { name: "filter by organization id only", fields: NotificationRuleFields{ @@ -835,26 +804,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -891,7 +840,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - OrgID: idPtr(MustIDBase16(oneID)), + OrgID: MustIDBase16Ptr(oneID), }, }, wants: wants{ @@ -922,26 +871,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -978,7 +907,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - Organization: strPtr("org4"), + Organization: pointer.String("org4"), }, }, wants: wants{ @@ -1007,256 +936,6 @@ func FindNotificationRules( }, }, }, - // { - // name: "filter by tags", - // fields: NotificationRuleFields{ - // Orgs: []*influxdb.Organization{ - // { - // ID: MustIDBase16(oneID), - // Name: "org1", - // }, - // { - // ID: MustIDBase16(fourID), - // Name: "org4", - // }, - // }, - // UserResourceMappings: []*influxdb.UserResourceMapping{ - // { - // ResourceID: MustIDBase16(oneID), - // ResourceType: influxdb.NotificationRuleResourceType, - // UserID: MustIDBase16(sixID), - // UserType: influxdb.Owner, - // }, - // { - // ResourceID: MustIDBase16(twoID), - // ResourceType: influxdb.NotificationRuleResourceType, - // UserID: MustIDBase16(sixID), - // UserType: influxdb.Member, - // }, - // { - // ResourceID: MustIDBase16(fourID), - // ResourceType: influxdb.NotificationRuleResourceType, - // UserID: MustIDBase16(sixID), - // UserType: influxdb.Owner, - // }, - // }, - // NotificationRules: []influxdb.NotificationRule{ - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(oneID), - // OrgID: MustIDBase16(fourID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr1", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // Channel: "ch1", - // MessageTemplate: "msg1", - // }, - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(twoID), - // OrgID: MustIDBase16(fourID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr2", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // { - // Tag: influxdb.Tag{ - // Key: "location", - // Value: "paris", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // MessageTemplate: "body2", - // }, - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(fourID), - // OrgID: MustIDBase16(oneID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr3", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // { - // Tag: influxdb.Tag{ - // Key: "location", - // Value: "paris", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // MessageTemplate: "msg", - // }, - // }, - // }, - // args: args{ - // filter: influxdb.NotificationRuleFilter{ - // Organization: strPtr("org4"), - // Tags: []influxdb.Tag{ - // { - // Key: "environment", - // Value: "production", - // }, - // { - // Key: "location", - // Value: "paris", - // }, - // }, - // }, - // }, - // wants: wants{ - // notificationRules: []influxdb.NotificationRule{ - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(fourID), - // OrgID: MustIDBase16(oneID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr3", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // { - // Tag: influxdb.Tag{ - // Key: "location", - // Value: "paris", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // MessageTemplate: "msg", - // }, - // }, - // }, - // }, - { - name: "find owners and restrict by organization", - fields: NotificationRuleFields{ - Orgs: []*influxdb.Organization{ - { - ID: MustIDBase16(oneID), - Name: "org1", - }, - { - ID: MustIDBase16(fourID), - Name: "org4", - }, - }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, - NotificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(oneID), - OrgID: MustIDBase16(fourID), - EndpointID: 1, - OwnerID: MustIDBase16(sixID), - Name: "nr1", - }, - Channel: "ch1", - MessageTemplate: "msg1", - }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(twoID), - OrgID: MustIDBase16(fourID), - OwnerID: MustIDBase16(sixID), - EndpointID: 1, - Name: "nr2", - }, - MessageTemplate: "body2", - }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(fourID), - OrgID: MustIDBase16(oneID), - OwnerID: MustIDBase16(sixID), - EndpointID: 1, - Name: "nr3", - }, - MessageTemplate: "msg", - }, - }, - }, - args: args{ - filter: influxdb.NotificationRuleFilter{ - OrgID: idPtr(MustIDBase16(oneID)), - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(sixID), - ResourceType: influxdb.NotificationRuleResourceType, - UserType: influxdb.Owner, - }, - }, - }, - wants: wants{ - notificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(fourID), - OrgID: MustIDBase16(oneID), - OwnerID: MustIDBase16(sixID), - EndpointID: 1, - Name: "nr3", - }, - MessageTemplate: "msg", - }, - }, - }, - }, { name: "look for organization not bound to any notification rule", fields: NotificationRuleFields{ @@ -1270,26 +949,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1326,7 +985,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - OrgID: idPtr(MustIDBase16(oneID)), + OrgID: MustIDBase16Ptr(oneID), }, }, wants: wants{ @@ -1346,26 +1005,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1447,26 +1086,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1547,26 +1166,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1603,10 +1202,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - }, + OrgID: MustIDBase16Ptr(threeID), }, }, wants: wants{}, @@ -1615,7 +1211,7 @@ func FindNotificationRules( for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -1634,7 +1230,7 @@ func FindNotificationRules( // UpdateNotificationRule testing. func UpdateNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -1657,20 +1253,6 @@ func UpdateNotificationRule( name: "can't find the id", fields: NotificationRuleFields{ TimeGenerator: fakeGenerator, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1738,20 +1320,6 @@ func UpdateNotificationRule( fields: NotificationRuleFields{ TimeGenerator: fakeGenerator, IDGenerator: mock.NewIDGenerator(twoID, t), - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Tasks: []influxdb.TaskCreate{ { OwnerID: MustIDBase16(sixID), @@ -1767,7 +1335,7 @@ func UpdateNotificationRule( Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -1882,7 +1450,7 @@ func UpdateNotificationRule( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -1903,7 +1471,7 @@ func UpdateNotificationRule( // PatchNotificationRule testing. func PatchNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { @@ -1930,20 +1498,6 @@ func PatchNotificationRule( name: "can't find the id", fields: NotificationRuleFields{ TimeGenerator: fakeGenerator, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2010,27 +1564,13 @@ func PatchNotificationRule( `, }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Endpoints: []influxdb.NotificationEndpoint{ &endpoint.Slack{ URL: "http://localhost:7777", Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -2149,27 +1689,13 @@ func PatchNotificationRule( `, }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Endpoints: []influxdb.NotificationEndpoint{ &endpoint.Slack{ URL: "http://localhost:7777", Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -2278,7 +1804,7 @@ func PatchNotificationRule( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -2293,18 +1819,17 @@ func PatchNotificationRule( // DeleteNotificationRule testing. func DeleteNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { - id influxdb.ID - userID influxdb.ID + id influxdb.ID + orgID influxdb.ID } type wants struct { - notificationRules []influxdb.NotificationRule - userResourceMappings []*influxdb.UserResourceMapping - err error + notificationRules []influxdb.NotificationRule + err error } tests := []struct { name string @@ -2315,20 +1840,6 @@ func DeleteNotificationRule( { name: "bad id", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2368,28 +1879,14 @@ func DeleteNotificationRule( }, }, args: args{ - id: influxdb.ID(0), - userID: MustIDBase16(sixID), + id: influxdb.ID(0), + orgID: MustIDBase16(fourID), }, wants: wants{ err: &influxdb.Error{ Code: influxdb.EInvalid, Msg: "provided notification rule ID has invalid format", }, - userResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, notificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2432,20 +1929,6 @@ func DeleteNotificationRule( { name: "none existing config", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, IDGenerator: mock.NewIDGenerator(twoID, t), Tasks: []influxdb.TaskCreate{ { @@ -2502,28 +1985,14 @@ func DeleteNotificationRule( }, }, args: args{ - id: MustIDBase16(fourID), - userID: MustIDBase16(sixID), + id: MustIDBase16(fourID), + orgID: MustIDBase16(fourID), }, wants: wants{ err: &influxdb.Error{ Code: influxdb.ENotFound, Msg: "notification rule not found", }, - userResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, notificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2566,20 +2035,6 @@ func DeleteNotificationRule( { name: "regular delete", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Tasks: []influxdb.TaskCreate{ { OwnerID: MustIDBase16(sixID), @@ -2637,18 +2092,10 @@ func DeleteNotificationRule( }, }, args: args{ - id: MustIDBase16(twoID), - userID: MustIDBase16(sixID), + id: MustIDBase16(twoID), + orgID: MustIDBase16(fourID), }, wants: wants{ - userResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, notificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2675,17 +2122,14 @@ func DeleteNotificationRule( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() err := s.DeleteNotificationRule(ctx, tt.args.id) ErrorsEqual(t, err, tt.wants.err) filter := influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: tt.args.userID, - ResourceType: influxdb.NotificationRuleResourceType, - }, + OrgID: &tt.args.orgID, } nrs, n, err := s.FindNotificationRules(ctx, filter) if err != nil && tt.wants.err == nil { @@ -2707,3 +2151,70 @@ func DeleteNotificationRule( }) } } + +// MustIDBase16 is an helper to ensure a correct ID is built during testing. +func MustIDBase16(s string) influxdb.ID { + id, err := influxdb.IDFromString(s) + if err != nil { + panic(err) + } + return *id +} + +// MustIDBase16Ptr is an helper to ensure a correct *ID is built during testing. +func MustIDBase16Ptr(s string) *influxdb.ID { + id := MustIDBase16(s) + return &id +} + +// ErrorsEqual checks to see if the provided errors are equivalent. +func ErrorsEqual(t *testing.T, actual, expected error) { + t.Helper() + if expected == nil && actual == nil { + return + } + + if expected == nil && actual != nil { + t.Errorf("unexpected error %s", actual.Error()) + } + + if expected != nil && actual == nil { + t.Errorf("expected error %s but received nil", expected.Error()) + } + + if influxdb.ErrorCode(expected) != influxdb.ErrorCode(actual) { + t.Logf("\nexpected: %v\nactual: %v\n\n", expected, actual) + t.Errorf("expected error code %q but received %q", influxdb.ErrorCode(expected), influxdb.ErrorCode(actual)) + } + + if influxdb.ErrorMessage(expected) != influxdb.ErrorMessage(actual) { + t.Logf("\nexpected: %v\nactual: %v\n\n", expected, actual) + t.Errorf("expected error message %q but received %q", influxdb.ErrorMessage(expected), influxdb.ErrorMessage(actual)) + } +} + +func idPtr(id influxdb.ID) *influxdb.ID { + return &id +} + +func mustDuration(d string) *notification.Duration { + dur, err := time.ParseDuration(d) + if err != nil { + panic(err) + } + + ndur, err := notification.FromTimeDuration(dur) + if err != nil { + panic(err) + } + + // Filter out the zero values from the duration. + durs := make([]ast.Duration, 0, len(ndur.Values)) + for _, d := range ndur.Values { + if d.Magnitude != 0 { + durs = append(durs, d) + } + } + ndur.Values = durs + return &ndur +} diff --git a/notification/rule/service/service_test.go b/notification/rule/service/service_test.go new file mode 100644 index 00000000000..e0395222eff --- /dev/null +++ b/notification/rule/service/service_test.go @@ -0,0 +1,162 @@ +package service + +import ( + "context" + "errors" + "io/ioutil" + "os" + "testing" + + influxdb "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/bolt" + "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/kv/migration/all" + "github.com/influxdata/influxdb/v2/mock" + _ "github.com/influxdata/influxdb/v2/query/builtin" + "github.com/influxdata/influxdb/v2/query/fluxlang" + "github.com/influxdata/influxdb/v2/tenant" + "go.uber.org/zap/zaptest" +) + +func TestInmemNotificationRuleStore(t *testing.T) { + NotificationRuleStore(initInmemNotificationRuleStore, t) +} + +func initInmemNotificationRuleStore(f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) { + store := inmem.NewKVStore() + if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil { + t.Fatal(err) + } + + svc, tsvc, closeSvc := initNotificationRuleStore(store, f, t) + return svc, tsvc, func() { + closeSvc() + } +} + +func initBoltNotificationRuleStore(f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) { + store, closeBolt, err := newTestBoltStore(t) + if err != nil { + t.Fatal(err) + } + + svc, tsvc, closeSvc := initNotificationRuleStore(store, f, t) + return svc, tsvc, func() { + closeSvc() + closeBolt() + } +} + +func TestBoltNotificationRuleStore(t *testing.T) { + NotificationRuleStore(initBoltNotificationRuleStore, t) +} + +func initNotificationRuleStore(s kv.Store, f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) { + logger := zaptest.NewLogger(t) + kvsvc := kv.NewService(logger, s, kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) + kvsvc.IDGenerator = f.IDGenerator + kvsvc.TimeGenerator = f.TimeGenerator + if f.TimeGenerator == nil { + kvsvc.TimeGenerator = influxdb.RealTimeGenerator{} + } + + var ( + tenantStore = tenant.NewStore(s) + tenantSvc = tenant.NewService(tenantStore) + ) + + svc, err := NewRuleService(logger, s, kvsvc, tenantSvc, kvsvc) + if err != nil { + t.Fatal(err) + } + + svc.idGenerator = f.IDGenerator + if f.TimeGenerator != nil { + svc.timeGenerator = f.TimeGenerator + } + + ctx := context.Background() + for _, o := range f.Orgs { + withOrgID(tenantStore, o.ID, func() { + if err := tenantSvc.CreateOrganization(ctx, o); err != nil { + t.Fatalf("failed to populate org: %v", err) + } + }) + } + + for _, e := range f.Endpoints { + if err := kvsvc.CreateNotificationEndpoint(ctx, e, 1); err != nil { + t.Fatalf("failed to populate notification endpoint: %v", err) + } + } + + for _, nr := range f.NotificationRules { + nrc := influxdb.NotificationRuleCreate{ + NotificationRule: nr, + Status: influxdb.Active, + } + if err := svc.PutNotificationRule(ctx, nrc); err != nil { + t.Fatalf("failed to populate notification rule: %v", err) + } + } + + for _, c := range f.Tasks { + if _, err := kvsvc.CreateTask(ctx, c); err != nil { + t.Fatalf("failed to populate task: %v", err) + } + } + + return svc, kvsvc, func() { + for _, nr := range f.NotificationRules { + if err := svc.DeleteNotificationRule(ctx, nr.GetID()); err != nil { + t.Logf("failed to remove notification rule: %v", err) + } + } + for _, o := range f.Orgs { + if err := tenantSvc.DeleteOrganization(ctx, o.ID); err != nil { + t.Fatalf("failed to remove org: %v", err) + } + } + } +} + +func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) { + backup := store.OrgIDGen + defer func() { store.OrgIDGen = backup }() + + store.OrgIDGen = mock.NewStaticIDGenerator(orgID) + + fn() +} + +func newTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) { + f, err := ioutil.TempFile("", "influxdata-bolt-") + if err != nil { + return nil, nil, errors.New("unable to open temporary boltdb file") + } + f.Close() + + ctx := context.Background() + logger := zaptest.NewLogger(t) + path := f.Name() + + // skip fsync to improve test performance + s := bolt.NewKVStore(logger, path, bolt.WithNoSync) + if err := s.Open(context.Background()); err != nil { + return nil, nil, err + } + + if err := all.Up(ctx, logger, s); err != nil { + return nil, nil, err + } + + close := func() { + s.Close() + os.Remove(path) + } + + return s, close, nil +} diff --git a/pkger/service.go b/pkger/service.go index a4b874a70c2..a485b430d4a 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -3536,10 +3536,11 @@ func (r *rollbackCoordinator) runTilEnd(ctx context.Context, orgID, userID influ defer cancel() defer func() { - if recover() != nil { + if err := recover(); err != nil { r.logger.Error( "panic applying "+resource, zap.String("stack_trace", fmt.Sprintf("%+v", stack.Trace())), + zap.Reflect("panic", err), ) errStr.add(errMsg{ resource: resource, diff --git a/testing/notification_endpoint.go b/testing/notification_endpoint.go index ffd5ee865af..1f71b3ab5f6 100644 --- a/testing/notification_endpoint.go +++ b/testing/notification_endpoint.go @@ -27,7 +27,6 @@ type NotificationEndpointFields struct { var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)} var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)} -var time3 = time.Date(2006, time.July, 15, 5, 23, 53, 10, time.UTC) var notificationEndpointCmpOptions = cmp.Options{ cmp.Transformer("Sort", func(in []influxdb.NotificationEndpoint) []influxdb.NotificationEndpoint {