diff --git a/authorizer/authorize_find.go b/authorizer/authorize_find.go index f15c3b5f677..f517ceee01a 100644 --- a/authorizer/authorize_find.go +++ b/authorizer/authorize_find.go @@ -284,7 +284,7 @@ func AuthorizeFindChecks(ctx context.Context, rs []influxdb.Check) ([]influxdb.C } // AuthorizeFindUserResourceMappings takes the given items and returns only the ones that the user is authorized to read. -func AuthorizeFindUserResourceMappings(ctx context.Context, os OrganizationService, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) { +func AuthorizeFindUserResourceMappings(ctx context.Context, os OrgIDResolver, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) { // This filters without allocating // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating rrs := rs[:0] diff --git a/authorizer/label.go b/authorizer/label.go index d3b50590ef3..38ef58b8674 100644 --- a/authorizer/label.go +++ b/authorizer/label.go @@ -2,7 +2,6 @@ package authorizer import ( "context" - "errors" "github.com/influxdata/influxdb/v2" ) @@ -12,16 +11,16 @@ var _ influxdb.LabelService = (*LabelService)(nil) // LabelService wraps a influxdb.LabelService and authorizes actions // against it appropriately. type LabelService struct { - s influxdb.LabelService - orgSvc OrganizationService + s influxdb.LabelService + orgIDResolver OrgIDResolver } // NewLabelServiceWithOrg constructs an instance of an authorizing label serivce. // Replaces NewLabelService. -func NewLabelServiceWithOrg(s influxdb.LabelService, orgSvc OrganizationService) *LabelService { +func NewLabelServiceWithOrg(s influxdb.LabelService, orgIDResolver OrgIDResolver) *LabelService { return &LabelService{ - s: s, - orgSvc: orgSvc, + s: s, + orgIDResolver: orgIDResolver, } } @@ -55,10 +54,8 @@ func (s *LabelService) FindResourceLabels(ctx context.Context, filter influxdb.L if err := filter.ResourceType.Valid(); err != nil { return nil, err } - if s.orgSvc == nil { - return nil, errors.New("failed to find orgSvc") - } - orgID, err := s.orgSvc.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) + + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) if err != nil { return nil, err } diff --git a/authorizer/urm.go b/authorizer/urm.go index 02566aa6e18..b57097a3f9e 100644 --- a/authorizer/urm.go +++ b/authorizer/urm.go @@ -6,19 +6,19 @@ import ( "github.com/influxdata/influxdb/v2" ) -type OrganizationService interface { +type OrgIDResolver interface { FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) } type URMService struct { - s influxdb.UserResourceMappingService - orgService OrganizationService + s influxdb.UserResourceMappingService + orgIDResolver OrgIDResolver } -func NewURMService(orgSvc OrganizationService, s influxdb.UserResourceMappingService) *URMService { +func NewURMService(orgIDResolver OrgIDResolver, s influxdb.UserResourceMappingService) *URMService { return &URMService{ - s: s, - orgService: orgSvc, + s: s, + orgIDResolver: orgIDResolver, } } @@ -27,11 +27,11 @@ func (s *URMService) FindUserResourceMappings(ctx context.Context, filter influx if err != nil { return nil, 0, err } - return AuthorizeFindUserResourceMappings(ctx, s.orgService, urms) + return AuthorizeFindUserResourceMappings(ctx, s.orgIDResolver, urms) } func (s *URMService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error { - orgID, err := s.orgService.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID) + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID) if err != nil { return err } @@ -49,7 +49,7 @@ func (s *URMService) DeleteUserResourceMapping(ctx context.Context, resourceID i } for _, urm := range urms { - orgID, err := s.orgService.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID) + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID) if err != nil { return err } diff --git a/authorizer/urm_test.go b/authorizer/urm_test.go index bc1da8cf1a2..e1dd7de79e6 100644 --- a/authorizer/urm_test.go +++ b/authorizer/urm_test.go @@ -23,7 +23,7 @@ func (s *OrgService) FindResourceOrganizationID(ctx context.Context, rt influxdb func TestURMService_FindUserResourceMappings(t *testing.T) { type fields struct { UserResourceMappingService influxdb.UserResourceMappingService - OrgService authorizer.OrganizationService + OrgService authorizer.OrgIDResolver } type args struct { permission influxdb.Permission @@ -146,7 +146,7 @@ func TestURMService_FindUserResourceMappings(t *testing.T) { func TestURMService_WriteUserResourceMapping(t *testing.T) { type fields struct { UserResourceMappingService influxdb.UserResourceMappingService - OrgService authorizer.OrganizationService + OrgService authorizer.OrgIDResolver } type args struct { permission influxdb.Permission diff --git a/cmd/influx/task.go b/cmd/influx/task.go index 070e07310c4..ee7e08e1e5d 100644 --- a/cmd/influx/task.go +++ b/cmd/influx/task.go @@ -171,7 +171,7 @@ func taskFindF(cmd *cobra.Command, args []string) error { } filter.Limit = taskFindFlags.limit - var tasks []http.Task + var tasks []*influxdb.Task if taskFindFlags.id != "" { id, err := influxdb.IDFromString(taskFindFlags.id) @@ -184,7 +184,7 @@ func taskFindF(cmd *cobra.Command, args []string) error { return err } - tasks = append(tasks, *task) + tasks = append(tasks, task) } else { tasks, _, err = s.FindTasks(context.Background(), filter) if err != nil { @@ -322,8 +322,8 @@ func taskDeleteF(cmd *cobra.Command, args []string) error { type taskPrintOpts struct { hideHeaders bool json bool - task *http.Task - tasks []http.Task + task *influxdb.Task + tasks []*influxdb.Task } func printTasks(w io.Writer, opts taskPrintOpts) error { @@ -351,7 +351,7 @@ func printTasks(w io.Writer, opts taskPrintOpts) error { ) if opts.task != nil { - opts.tasks = append(opts.tasks, *opts.task) + opts.tasks = append(opts.tasks, opts.task) } for _, t := range opts.tasks { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 6df37580e02..b7aa2f2746a 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -31,6 +31,7 @@ import ( iqlquery "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/inmem" "github.com/influxdata/influxdb/v2/internal/fs" + "github.com/influxdata/influxdb/v2/internal/resource" "github.com/influxdata/influxdb/v2/kit/cli" "github.com/influxdata/influxdb/v2/kit/feature" overrideflagger "github.com/influxdata/influxdb/v2/kit/feature/override" @@ -45,6 +46,7 @@ import ( "github.com/influxdata/influxdb/v2/label" influxlogger "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/nats" + ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service" "github.com/influxdata/influxdb/v2/pkger" infprom "github.com/influxdata/influxdb/v2/prometheus" "github.com/influxdata/influxdb/v2/query" @@ -989,7 +991,14 @@ func (m *Launcher) run(ctx context.Context) (err error) { var notificationRuleSvc platform.NotificationRuleStore { coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor) - notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator) + notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService) + if err != nil { + return err + } + + // tasks service notification middleware which keeps task service up to date + // with persisted changes to notification rules. + notificationRuleSvc = middleware.NewNotificationRuleStore(notificationRuleSvc, m.kvService, coordinator) } // NATS streaming server @@ -1121,6 +1130,25 @@ func (m *Launcher) run(ctx context.Context) (err error) { onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, metric.WithSuffix("new")) // with metrics onboardSvc = tenant.NewOnboardingLogger(m.log.With(zap.String("handler", "onboard")), onboardSvc) // with logging + // orgIDResolver is a deprecated type which combines the lookups + // of multiple resources into one type, used to resolve the resources + // associated org ID. It is a stop-gap while we move this behaviour + // off of *kv.Service to aid in reducing the coupling on this type. + orgIDResolver := &resource.OrgIDResolver{ + AuthorizationFinder: authSvc, + BucketFinder: ts.BucketService, + OrganizationFinder: ts.OrganizationService, + DashboardFinder: dashboardSvc, + SourceFinder: sourceSvc, + TaskFinder: taskSvc, + TelegrafConfigFinder: telegrafSvc, + VariableFinder: variableSvc, + TargetFinder: scraperTargetSvc, + CheckFinder: checkSvc, + NotificationEndpointFinder: notificationEndpointStore, + NotificationRuleFinder: notificationRuleSvc, + } + m.apibackend = &http.APIBackend{ AssetsPath: m.assetsPath, HTTPErrorHandler: kithttp.ErrorHandler(0), @@ -1169,7 +1197,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { SecretService: secretSvc, LookupService: lookupSvc, DocumentService: m.kvService, - OrgLookupService: m.kvService, + OrgLookupService: orgIDResolver, WriteEventRecorder: infprom.NewEventRecorder("write"), QueryEventRecorder: infprom.NewEventRecorder("query"), Flagger: m.flagger, diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 7c06b577493..75e33a4455c 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -16,7 +16,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/lang" - platform "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/bolt" influxdbcontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/http" @@ -37,10 +37,10 @@ type TestLauncher struct { Path string // Initialized after calling the Setup() helper. - User *platform.User - Org *platform.Organization - Bucket *platform.Bucket - Auth *platform.Authorization + User *influxdb.User + Org *influxdb.Organization + Bucket *influxdb.Bucket + Auth *influxdb.Authorization httpClient *httpc.Client @@ -127,7 +127,7 @@ func (tl *TestLauncher) ShutdownOrFail(tb testing.TB, ctx context.Context) { // Setup creates a new user, bucket, org, and auth token. func (tl *TestLauncher) Setup() error { - results, err := tl.OnBoard(&platform.OnboardingRequest{ + results, err := tl.OnBoard(&influxdb.OnboardingRequest{ User: "USER", Password: "PASSWORD", Org: "ORG", @@ -153,13 +153,13 @@ func (tl *TestLauncher) SetupOrFail(tb testing.TB) { // OnBoard attempts an on-boarding request. // The on-boarding status is also reset to allow multiple user/org/buckets to be created. -func (tl *TestLauncher) OnBoard(req *platform.OnboardingRequest) (*platform.OnboardingResults, error) { +func (tl *TestLauncher) OnBoard(req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { return tl.apibackend.OnboardingService.OnboardInitialUser(context.Background(), req) } // OnBoardOrFail attempts an on-boarding request or fails on error. // The on-boarding status is also reset to allow multiple user/org/buckets to be created. -func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingRequest) *platform.OnboardingResults { +func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *influxdb.OnboardingRequest) *influxdb.OnboardingResults { tb.Helper() res, err := tl.OnBoard(req) if err != nil { @@ -169,7 +169,7 @@ func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingReq } // WriteOrFail attempts a write to the organization and bucket identified by to or fails if there is an error. -func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *platform.OnboardingResults, data string) { +func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *influxdb.OnboardingResults, data string) { tb.Helper() resp, err := nethttp.DefaultClient.Do(tl.NewHTTPRequestOrFail(tb, "POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", to.Org.ID, to.Bucket.ID), to.Auth.Token, data)) if err != nil { @@ -295,7 +295,7 @@ func (tl *TestLauncher) QueryAndNopConsume(ctx context.Context, req *query.Reque // FluxQueryOrFail performs a query to the specified organization and returns the results // or fails if there is an error. -func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organization, token string, query string) string { +func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *influxdb.Organization, token string, query string) string { tb.Helper() b, err := http.SimpleQuery(tl.URL(), query, org.Name, token) @@ -308,7 +308,7 @@ func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organizatio // QueryFlux returns the csv response from a flux query. // It also removes all the \r to make it easier to write tests. -func (tl *TestLauncher) QueryFlux(tb testing.TB, org *platform.Organization, token, query string) string { +func (tl *TestLauncher) QueryFlux(tb testing.TB, org *influxdb.Organization, token, query string) string { tb.Helper() b, err := http.SimpleQuery(tl.URL(), query, org.Name, token) @@ -367,7 +367,7 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService { return &http.BucketService{Client: tl.HTTPClient(tb)} } -func (tl *TestLauncher) CheckService() platform.CheckService { +func (tl *TestLauncher) CheckService() influxdb.CheckService { return tl.kvService } @@ -386,11 +386,12 @@ func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.Notific return http.NewNotificationEndpointService(tl.HTTPClient(tb)) } -func (tl *TestLauncher) NotificationRuleService() platform.NotificationRuleStore { - return tl.kvService +func (tl *TestLauncher) NotificationRuleService(tb testing.TB) influxdb.NotificationRuleStore { + tb.Helper() + return http.NewNotificationRuleService(tl.HTTPClient(tb)) } -func (tl *TestLauncher) OrgService(tb testing.TB) platform.OrganizationService { +func (tl *TestLauncher) OrgService(tb testing.TB) influxdb.OrganizationService { return tl.kvService } @@ -398,7 +399,7 @@ func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC { return &pkger.HTTPRemoteService{Client: tl.HTTPClient(tb)} } -func (tl *TestLauncher) TaskServiceKV() platform.TaskService { +func (tl *TestLauncher) TaskServiceKV(tb testing.TB) influxdb.TaskService { return tl.kvService } @@ -416,7 +417,7 @@ func (tl *TestLauncher) AuthorizationService(tb testing.TB) *http.AuthorizationS return &http.AuthorizationService{Client: tl.HTTPClient(tb)} } -func (tl *TestLauncher) TaskService(tb testing.TB) *http.TaskService { +func (tl *TestLauncher) TaskService(tb testing.TB) influxdb.TaskService { return &http.TaskService{Client: tl.HTTPClient(tb)} } diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 8a09788bd8e..833d6daa60d 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/http" "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/check" @@ -833,9 +832,9 @@ func TestLauncher_Pkger(t *testing.T) { deleteKillCount: 3, }), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), - pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)), pkger.WithStore(pkger.NewStoreKV(l.Launcher.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -1134,11 +1133,11 @@ func TestLauncher_Pkger(t *testing.T) { pkger.WithLabelSVC(l.LabelService(t)), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithNotificationRuleSVC(&fakeRuleStore{ - NotificationRuleStore: l.NotificationRuleService(), + NotificationRuleStore: l.NotificationRuleService(t), createKillCount: 2, }), pkger.WithStore(pkger.NewStoreKV(l.Launcher.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -2319,10 +2318,10 @@ func TestLauncher_Pkger(t *testing.T) { createKillCount: 2, // hits error on 3rd attempt at creating a mapping }), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), - pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)), pkger.WithOrganizationService(l.OrganizationService()), pkger.WithStore(pkger.NewStoreKV(l.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -2356,13 +2355,13 @@ func TestLauncher_Pkger(t *testing.T) { require.NoError(t, err) assert.Empty(t, endpoints) - rules, _, err := l.NotificationRuleService().FindNotificationRules(ctx, influxdb.NotificationRuleFilter{ + rules, _, err := l.NotificationRuleService(t).FindNotificationRules(ctx, influxdb.NotificationRuleFilter{ OrgID: &l.Org.ID, }) require.NoError(t, err) assert.Empty(t, rules) - tasks, _, err := l.TaskServiceKV().FindTasks(ctx, influxdb.TaskFilter{ + tasks, _, err := l.TaskServiceKV(t).FindTasks(ctx, influxdb.TaskFilter{ OrganizationID: &l.Org.ID, }) require.NoError(t, err) @@ -3445,10 +3444,10 @@ spec: pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), - pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)), pkger.WithOrganizationService(l.OrganizationService()), pkger.WithStore(pkger.NewStoreKV(l.kvStore)), - pkger.WithTaskSVC(l.TaskServiceKV()), + pkger.WithTaskSVC(l.TaskServiceKV(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -4902,7 +4901,7 @@ func (r resourceChecker) mustDeleteLabel(t *testing.T, id influxdb.ID) { func (r resourceChecker) getRule(t *testing.T, getOpt getResourceOptFn) (influxdb.NotificationRule, error) { t.Helper() - ruleSVC := r.tl.NotificationRuleService() + ruleSVC := r.tl.NotificationRuleService(t) var ( rule influxdb.NotificationRule @@ -4944,16 +4943,16 @@ func (r resourceChecker) mustGetRule(t *testing.T, getOpt getResourceOptFn) infl func (r resourceChecker) mustDeleteRule(t *testing.T, id influxdb.ID) { t.Helper() - require.NoError(t, r.tl.NotificationRuleService().DeleteNotificationRule(ctx, id)) + require.NoError(t, r.tl.NotificationRuleService(t).DeleteNotificationRule(ctx, id)) } -func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Task, error) { +func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (influxdb.Task, error) { t.Helper() taskSVC := r.tl.TaskService(t) var ( - task *http.Task + task *influxdb.Task err error ) switch opt := getOpt(); { @@ -4963,11 +4962,11 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta OrganizationID: &r.tl.Org.ID, }) if err != nil { - return http.Task{}, err + return influxdb.Task{}, err } for _, tt := range tasks { if tt.Name == opt.name { - task = &tasks[0] + task = tasks[0] break } } @@ -4977,13 +4976,13 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta require.Fail(t, "did not provide a valid get option") } if task == nil { - return http.Task{}, errors.New("did not find expected task by name") + return influxdb.Task{}, errors.New("did not find expected task by name") } return *task, err } -func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) http.Task { +func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) influxdb.Task { t.Helper() task, err := r.getTask(t, getOpt) diff --git a/http/api_handler.go b/http/api_handler.go index 474f7eddd5a..ba8f66590d1 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -87,7 +87,7 @@ type APIBackend struct { SecretService influxdb.SecretService LookupService influxdb.LookupService ChronografService *server.Service - OrgLookupService authorizer.OrganizationService + OrgLookupService authorizer.OrgIDResolver DocumentService influxdb.DocumentService NotificationRuleStore influxdb.NotificationRuleStore NotificationEndpointService influxdb.NotificationEndpointService diff --git a/http/document_service_test.go b/http/document_service_test.go index 914b165608a..1cf96133ca2 100644 --- a/http/document_service_test.go +++ b/http/document_service_test.go @@ -65,13 +65,14 @@ func setup(t *testing.T) (func(auth influxdb.Authorizer) *httptest.Server, func( if err := ds.CreateDocument(ctx, adoc); err != nil { panic(err) } + // Organizations are needed only for creation. // Need to cleanup for comparison later. adoc.Organizations = nil backend := NewMockDocumentBackend(t) backend.HTTPErrorHandler = http.ErrorHandler(0) backend.DocumentService = authorizer.NewDocumentService(svc) - backend.LabelService = authorizer.NewLabelServiceWithOrg(svc, svc) + backend.LabelService = authorizer.NewLabelServiceWithOrg(svc, staticOrgIDResolver(org.ID)) serverFn := func(auth influxdb.Authorizer) *httptest.Server { handler := httpmock.NewAuthMiddlewareHandler(NewDocumentHandler(backend), auth) return httptest.NewServer(handler) @@ -751,3 +752,9 @@ func DeleteLabel(t *testing.T) { } }) } + +type staticOrgIDResolver influxdb.ID + +func (s staticOrgIDResolver) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) { + return (influxdb.ID)(s), nil +} diff --git a/http/notification_rule.go b/http/notification_rule.go index 1667bceb25b..44afb2d6559 100644 --- a/http/notification_rule.go +++ b/http/notification_rule.go @@ -163,21 +163,41 @@ type notificationRuleResponse struct { LastRunError string `json:"LastRunError,omitempty"` } +type ruleResponseMeta struct { + Labels []influxdb.Label `json:"labels"` + Links notificationRuleLinks `json:"links"` + Status string `json:"status"` + LatestCompleted time.Time `json:"latestCompleted,omitempty"` + LatestScheduled time.Time `json:"latestScheduled,omitempty"` + LastRunStatus string `json:"lastRunStatus,omitempty"` + LastRunError string `json:"lastRunError,omitempty"` +} + +func (resp *notificationRuleResponse) UnmarshalJSON(v []byte) (err error) { + var responseMeta ruleResponseMeta + if err = json.Unmarshal(v, &responseMeta); err != nil { + return + } + + resp.Labels = responseMeta.Labels + resp.Links = responseMeta.Links + resp.Status = responseMeta.Status + resp.LatestCompleted = responseMeta.LatestCompleted + resp.LatestScheduled = responseMeta.LatestScheduled + resp.LastRunStatus = responseMeta.LastRunStatus + resp.LastRunError = responseMeta.LastRunError + + resp.NotificationRule, err = rule.UnmarshalJSON(v) + return +} + func (resp notificationRuleResponse) MarshalJSON() ([]byte, error) { b1, err := json.Marshal(resp.NotificationRule) if err != nil { return nil, err } - b2, err := json.Marshal(struct { - Labels []influxdb.Label `json:"labels"` - Links notificationRuleLinks `json:"links"` - Status string `json:"status"` - LatestCompleted time.Time `json:"latestCompleted,omitempty"` - LatestScheduled time.Time `json:"latestScheduled,omitempty"` - LastRunStatus string `json:"lastRunStatus,omitempty"` - LastRunError string `json:"lastRunError,omitempty"` - }{ + b2, err := json.Marshal(ruleResponseMeta{ Links: resp.Links, Labels: resp.Labels, Status: resp.Status, @@ -825,6 +845,7 @@ func (s *NotificationRuleService) FindNotificationRules(ctx context.Context, fil for _, r := range resp.NotificationRules { rules = append(rules, r.rule) } + return rules, len(rules), nil } diff --git a/http/task_service.go b/http/task_service.go index 7ddfee8f023..6bdc5d5ae04 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/pkg/httpc" + "github.com/influxdata/influxdb/v2/task/options" "go.uber.org/zap" ) @@ -220,6 +221,54 @@ func NewFrontEndTask(t influxdb.Task) Task { } } +func convertTask(t Task) *influxdb.Task { + var ( + latestCompleted time.Time + createdAt time.Time + updatedAt time.Time + offset time.Duration + ) + + if t.LatestCompleted != "" { + latestCompleted, _ = time.Parse(time.RFC3339, t.LatestCompleted) + } + + if t.CreatedAt != "" { + createdAt, _ = time.Parse(time.RFC3339, t.CreatedAt) + } + + if t.UpdatedAt != "" { + updatedAt, _ = time.Parse(time.RFC3339, t.UpdatedAt) + } + + if t.Offset != "" { + var duration options.Duration + if err := duration.Parse(t.Offset); err == nil { + offset, _ = duration.DurationFrom(time.Now()) + } + } + + return &influxdb.Task{ + ID: t.ID, + OrganizationID: t.OrganizationID, + Organization: t.Organization, + OwnerID: t.OwnerID, + Name: t.Name, + Description: t.Description, + Status: t.Status, + Flux: t.Flux, + Every: t.Every, + Cron: t.Cron, + Offset: offset, + LatestCompleted: latestCompleted, + LastRunStatus: t.LastRunStatus, + LastRunError: t.LastRunError, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + Metadata: t.Metadata, + } +} + func customParseDuration(d time.Duration) string { str := "" if d < 0 { @@ -1414,7 +1463,7 @@ type TaskService struct { } // FindTaskByID returns a single task -func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, error) { +func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1424,12 +1473,12 @@ func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, e return nil, err } - return &tr.Task, nil + return convertTask(tr.Task), nil } // FindTasks returns a list of tasks that match a filter (limit 100) and the total count // of matching tasks. -func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]Task, int, error) { +func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1470,15 +1519,15 @@ func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) return nil, 0, err } - tasks := make([]Task, len(tr.Tasks)) + tasks := make([]*influxdb.Task, len(tr.Tasks)) for i := range tr.Tasks { - tasks[i] = tr.Tasks[i].Task + tasks[i] = convertTask(tr.Tasks[i].Task) } return tasks, len(tasks), nil } // CreateTask creates a new task. -func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*Task, error) { +func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() var tr taskResponse @@ -1491,11 +1540,11 @@ func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*T return nil, err } - return &tr.Task, nil + return convertTask(tr.Task), nil } // UpdateTask updates a single task with changeset. -func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*Task, error) { +func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1507,7 +1556,7 @@ func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxd return nil, err } - return &tr.Task, nil + return convertTask(tr.Task), nil } // DeleteTask removes a task by ID and purges all associated data and scheduled runs. diff --git a/internal/resource/org_id.go b/internal/resource/org_id.go new file mode 100644 index 00000000000..e05fb6f9410 --- /dev/null +++ b/internal/resource/org_id.go @@ -0,0 +1,194 @@ +package resource + +import ( + "context" + "fmt" + + "github.com/influxdata/influxdb/v2" +) + +// OrgIDResolver is a type which combines multiple resource services +// in order to resolve the resources associated org ID. +// Ideally you do not need to use this type, it is mostly a stop-gap +// while we migrate responsibilities off of *kv.Service. +// Consider it deprecated. +type OrgIDResolver struct { + AuthorizationFinder interface { + FindAuthorizationByID(context.Context, influxdb.ID) (*influxdb.Authorization, error) + } + BucketFinder interface { + FindBucketByID(context.Context, influxdb.ID) (*influxdb.Bucket, error) + } + OrganizationFinder interface { + FindOrganizationByID(context.Context, influxdb.ID) (*influxdb.Organization, error) + } + DashboardFinder interface { + FindDashboardByID(context.Context, influxdb.ID) (*influxdb.Dashboard, error) + } + SourceFinder interface { + FindSourceByID(context.Context, influxdb.ID) (*influxdb.Source, error) + } + TaskFinder interface { + FindTaskByID(context.Context, influxdb.ID) (*influxdb.Task, error) + } + TelegrafConfigFinder interface { + FindTelegrafConfigByID(context.Context, influxdb.ID) (*influxdb.TelegrafConfig, error) + } + VariableFinder interface { + FindVariableByID(context.Context, influxdb.ID) (*influxdb.Variable, error) + } + TargetFinder interface { + GetTargetByID(context.Context, influxdb.ID) (*influxdb.ScraperTarget, error) + } + CheckFinder interface { + FindCheckByID(context.Context, influxdb.ID) (influxdb.Check, error) + } + NotificationEndpointFinder interface { + FindNotificationEndpointByID(context.Context, influxdb.ID) (influxdb.NotificationEndpoint, error) + } + NotificationRuleFinder interface { + FindNotificationRuleByID(context.Context, influxdb.ID) (influxdb.NotificationRule, error) + } +} + +// FindResourceOrganizationID is used to find the organization that a resource belongs to five the id of a resource and a resource type. +func (o *OrgIDResolver) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) { + switch rt { + case influxdb.AuthorizationsResourceType: + if o.AuthorizationFinder == nil { + break + } + + r, err := o.AuthorizationFinder.FindAuthorizationByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.BucketsResourceType: + if o.BucketFinder == nil { + break + } + + r, err := o.BucketFinder.FindBucketByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.OrgsResourceType: + if o.OrganizationFinder == nil { + break + } + + r, err := o.OrganizationFinder.FindOrganizationByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.ID, nil + case influxdb.DashboardsResourceType: + if o.DashboardFinder == nil { + break + } + + r, err := o.DashboardFinder.FindDashboardByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.SourcesResourceType: + if o.SourceFinder == nil { + break + } + + r, err := o.SourceFinder.FindSourceByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.TasksResourceType: + if o.TaskFinder == nil { + break + } + + r, err := o.TaskFinder.FindTaskByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.TelegrafsResourceType: + if o.TelegrafConfigFinder == nil { + break + } + + r, err := o.TelegrafConfigFinder.FindTelegrafConfigByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.VariablesResourceType: + if o.VariableFinder == nil { + break + } + + r, err := o.VariableFinder.FindVariableByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrganizationID, nil + case influxdb.ScraperResourceType: + if o.TargetFinder == nil { + break + } + + r, err := o.TargetFinder.GetTargetByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.OrgID, nil + case influxdb.ChecksResourceType: + if o.CheckFinder == nil { + break + } + + r, err := o.CheckFinder.FindCheckByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.GetOrgID(), nil + case influxdb.NotificationEndpointResourceType: + if o.NotificationEndpointFinder == nil { + break + } + + r, err := o.NotificationEndpointFinder.FindNotificationEndpointByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.GetOrgID(), nil + case influxdb.NotificationRuleResourceType: + if o.NotificationRuleFinder == nil { + break + } + + r, err := o.NotificationRuleFinder.FindNotificationRuleByID(ctx, id) + if err != nil { + return influxdb.InvalidID(), err + } + + return r.GetOrgID(), nil + } + + return influxdb.InvalidID(), &influxdb.Error{ + Msg: fmt.Sprintf("unsupported resource type %s", rt), + } +} diff --git a/kv/initial_migration.go b/kv/initial_migration.go index 4fa5ac78635..17661be0908 100644 --- a/kv/initial_migration.go +++ b/kv/initial_migration.go @@ -46,7 +46,7 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error { telegrafBucket, telegrafPluginsBucket, urmBucket, - notificationRuleBucket, + []byte("notificationRulev1"), userBucket, userIndex, sourceBucket, diff --git a/kv/notification_rule.go b/kv/notification_rule.go deleted file mode 100644 index fefe1b91be7..00000000000 --- a/kv/notification_rule.go +++ /dev/null @@ -1,498 +0,0 @@ -package kv - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/influxdata/influxdb/v2/notification/rule" - "go.uber.org/zap" - - "github.com/influxdata/influxdb/v2" -) - -var ( - notificationRuleBucket = []byte("notificationRulev1") - - // ErrNotificationRuleNotFound is used when the notification rule is not found. - ErrNotificationRuleNotFound = &influxdb.Error{ - Msg: "notification rule not found", - Code: influxdb.ENotFound, - } - - // ErrInvalidNotificationRuleID is used when the service was provided - // an invalid ID format. - ErrInvalidNotificationRuleID = &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "provided notification rule ID has invalid format", - } -) - -var _ influxdb.NotificationRuleStore = (*Service)(nil) - -// UnavailableNotificationRuleStoreError is used if we aren't able to interact with the -// store, it means the store is not available at the moment (e.g. network). -func UnavailableNotificationRuleStoreError(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInternal, - Msg: fmt.Sprintf("Unable to connect to notification rule store service. Please try again; Err: %v", err), - Op: "kv/notificationRule", - } -} - -// InternalNotificationRuleStoreError is used when the error comes from an -// internal system. -func InternalNotificationRuleStoreError(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInternal, - Msg: fmt.Sprintf("Unknown internal notificationRule data error; Err: %v", err), - Op: "kv/notificationRule", - } -} - -func (s *Service) notificationRuleBucket(tx Tx) (Bucket, error) { - b, err := tx.Bucket(notificationRuleBucket) - if err != nil { - return nil, UnavailableNotificationRuleStoreError(err) - } - return b, nil -} - -// CreateNotificationRule creates a new notification rule and sets b.ID with the new identifier. -func (s *Service) CreateNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { - return s.kv.Update(ctx, func(tx Tx) error { - return s.createNotificationRule(ctx, tx, nr, userID) - }) -} - -func (s *Service) createNotificationRule(ctx context.Context, tx Tx, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { - id := s.IDGenerator.ID() - nr.SetID(id) - now := s.TimeGenerator.Now() - nr.SetOwnerID(userID) - nr.SetCreatedAt(now) - nr.SetUpdatedAt(now) - - t, err := s.createNotificationTask(ctx, tx, nr) - if err != nil { - return err - } - - nr.SetTaskID(t.ID) - - if err := nr.Valid(); err != nil { - return err - } - - if err := nr.Status.Valid(); err != nil { - return err - } - - if err := s.putNotificationRule(ctx, tx, nr.NotificationRule); err != nil { - return err - } - - urm := &influxdb.UserResourceMapping{ - ResourceID: id, - UserID: userID, - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - } - return s.createUserResourceMapping(ctx, tx, urm) -} - -func (s *Service) createNotificationTask(ctx context.Context, tx Tx, r influxdb.NotificationRuleCreate) (*influxdb.Task, error) { - ep, err := s.findNotificationEndpointByID(ctx, tx, r.GetEndpointID()) - if err != nil { - return nil, err - } - - script, err := r.GenerateFlux(ep) - if err != nil { - return nil, err - } - - status := string(r.Status) - - tc := influxdb.TaskCreate{ - Type: r.Type(), - Flux: script, - OwnerID: r.GetOwnerID(), - OrganizationID: r.GetOrgID(), - Status: status, - } - - t, err := s.createTask(ctx, tx, tc) - if err != nil { - return nil, err - } - - return t, nil -} - -func (s *Service) updateNotificationTask(ctx context.Context, tx Tx, r influxdb.NotificationRule, status *string) (*influxdb.Task, error) { - ep, err := s.findNotificationEndpointByID(ctx, tx, r.GetEndpointID()) - if err != nil { - return nil, err - } - - script, err := r.GenerateFlux(ep) - if err != nil { - return nil, err - } - - tu := influxdb.TaskUpdate{ - Flux: &script, - Description: strPtr(r.GetDescription()), - Status: status, - } - - t, err := s.updateTask(ctx, tx, r.GetTaskID(), tu) - if err != nil { - return nil, err - } - - return t, nil -} - -// UpdateNotificationRule updates a single notification rule. -// Returns the new notification rule after update. -func (s *Service) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) { - var err error - var rule influxdb.NotificationRule - - err = s.kv.Update(ctx, func(tx Tx) error { - rule, err = s.updateNotificationRule(ctx, tx, id, nr, userID) - return err - }) - - return rule, err -} - -func (s *Service) updateNotificationRule(ctx context.Context, tx Tx, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) { - - current, err := s.findNotificationRuleByID(ctx, tx, id) - if err != nil { - return nil, err - } - - // ID and OrganizationID can not be updated - nr.SetID(current.GetID()) - nr.SetOrgID(current.GetOrgID()) - nr.SetOwnerID(current.GetOwnerID()) - nr.SetCreatedAt(current.GetCRUDLog().CreatedAt) - nr.SetUpdatedAt(s.TimeGenerator.Now()) - nr.SetTaskID(current.GetTaskID()) - - if err := nr.Valid(); err != nil { - return nil, err - } - - if err := nr.Status.Valid(); err != nil { - return nil, err - } - - _, err = s.updateNotificationTask(ctx, tx, nr, strPtr(string(nr.Status))) - if err != nil { - return nil, err - } - - if err := s.putNotificationRule(ctx, tx, nr.NotificationRule); err != nil { - return nil, err - } - - return nr.NotificationRule, nil -} - -// PatchNotificationRule updates a single notification rule with changeset. -// Returns the new notification rule state after update. -func (s *Service) PatchNotificationRule(ctx context.Context, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { - var nr influxdb.NotificationRule - if err := s.kv.Update(ctx, func(tx Tx) (err error) { - nr, err = s.patchNotificationRule(ctx, tx, id, upd) - if err != nil { - return err - } - return nil - }); err != nil { - return nil, err - } - - return nr, nil -} - -func (s *Service) patchNotificationRule(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { - nr, err := s.findNotificationRuleByID(ctx, tx, id) - if err != nil { - return nil, err - } - - if upd.Name != nil { - nr.SetName(*upd.Name) - } - if upd.Description != nil { - nr.SetDescription(*upd.Description) - } - var status *string - if upd.Status != nil { - status = strPtr(string(*upd.Status)) - } - - nr.SetUpdatedAt(s.TimeGenerator.Now()) - - if err := nr.Valid(); err != nil { - return nil, err - } - - _, err = s.updateNotificationTask(ctx, tx, nr, status) - if err != nil { - return nil, err - } - - err = s.putNotificationRule(ctx, tx, nr) - if err != nil { - return nil, err - } - - return nr, nil -} - -// PutNotificationRule put a notification rule to storage. -func (s *Service) PutNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate) error { - return s.kv.Update(ctx, func(tx Tx) (err error) { - if err := nr.Valid(); err != nil { - return err - } - - if err := nr.Status.Valid(); err != nil { - return err - } - - return s.putNotificationRule(ctx, tx, nr) - }) -} - -func (s *Service) putNotificationRule(ctx context.Context, tx Tx, nr influxdb.NotificationRule) error { - encodedID, _ := nr.GetID().Encode() - - v, err := json.Marshal(nr) - if err != nil { - return err - } - - bucket, err := s.notificationRuleBucket(tx) - if err != nil { - return err - } - - if err := bucket.Put(encodedID, v); err != nil { - return UnavailableNotificationRuleStoreError(err) - } - return nil -} - -// FindNotificationRuleByID returns a single notification rule by ID. -func (s *Service) FindNotificationRuleByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationRule, error) { - var ( - nr influxdb.NotificationRule - err error - ) - - err = s.kv.View(ctx, func(tx Tx) error { - nr, err = s.findNotificationRuleByID(ctx, tx, id) - return err - }) - - return nr, err -} - -func (s *Service) findNotificationRuleByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.NotificationRule, error) { - encID, err := id.Encode() - if err != nil { - return nil, ErrInvalidNotificationRuleID - } - - bucket, err := s.notificationRuleBucket(tx) - if err != nil { - return nil, err - } - - v, err := bucket.Get(encID) - if IsNotFound(err) { - return nil, ErrNotificationRuleNotFound - } - if err != nil { - return nil, InternalNotificationRuleStoreError(err) - } - - return rule.UnmarshalJSON(v) -} - -// FindNotificationRules returns a list of notification rules that match filter and the total count of matching notification rules. -// Additional options provide pagination & sorting. -func (s *Service) FindNotificationRules(ctx context.Context, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) (nrs []influxdb.NotificationRule, n int, err error) { - err = s.kv.View(ctx, func(tx Tx) error { - nrs, n, err = s.findNotificationRules(ctx, tx, filter, opt...) - return err - }) - return nrs, n, err -} - -func (s *Service) findNotificationRules(ctx context.Context, tx Tx, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationRule, int, error) { - nrs := make([]influxdb.NotificationRule, 0) - - m, err := s.findUserResourceMappings(ctx, tx, filter.UserResourceMappingFilter) - if err != nil { - return nil, 0, err - } - - if len(m) == 0 { - return nrs, 0, nil - } - - idMap := make(map[influxdb.ID]bool) - for _, item := range m { - idMap[item.ResourceID] = false - } - - if filter.OrgID != nil || filter.Organization != nil { - o, err := s.findOrganization(ctx, tx, influxdb.OrganizationFilter{ - ID: filter.OrgID, - Name: filter.Organization, - }) - - if err != nil { - return nrs, 0, err - } - filter.OrgID = &o.ID - } - - var offset, limit, count int - var descending bool - if len(opt) > 0 { - offset = opt[0].Offset - limit = opt[0].Limit - descending = opt[0].Descending - } - filterFn := filterNotificationRulesFn(idMap, filter) - err = s.forEachNotificationRule(ctx, tx, descending, func(nr influxdb.NotificationRule) bool { - if filterFn(nr) { - if count >= offset { - nrs = append(nrs, nr) - } - count++ - } - - if limit > 0 && len(nrs) >= limit { - return false - } - - return true - }) - - return nrs, len(nrs), err -} - -// forEachNotificationRule will iterate through all notification rules while fn returns true. -func (s *Service) forEachNotificationRule(ctx context.Context, tx Tx, descending bool, fn func(influxdb.NotificationRule) bool) error { - - bkt, err := s.notificationRuleBucket(tx) - if err != nil { - return err - } - - direction := CursorAscending - if descending { - direction = CursorDescending - } - - cur, err := bkt.ForwardCursor(nil, WithCursorDirection(direction)) - if err != nil { - return err - } - - for k, v := cur.Next(); k != nil; k, v = cur.Next() { - nr, err := rule.UnmarshalJSON(v) - if err != nil { - return err - } - if !fn(nr) { - break - } - } - - return nil -} - -func filterNotificationRulesFn(idMap map[influxdb.ID]bool, filter influxdb.NotificationRuleFilter) func(nr influxdb.NotificationRule) bool { - if filter.OrgID != nil { - return func(nr influxdb.NotificationRule) bool { - if !nr.MatchesTags(filter.Tags) { - return false - } - - _, ok := idMap[nr.GetID()] - return nr.GetOrgID() == *filter.OrgID && ok - } - } - - return func(nr influxdb.NotificationRule) bool { - if !nr.MatchesTags(filter.Tags) { - return false - } - - _, ok := idMap[nr.GetID()] - return ok - } -} - -// DeleteNotificationRule removes a notification rule by ID. -func (s *Service) DeleteNotificationRule(ctx context.Context, id influxdb.ID) error { - return s.kv.Update(ctx, func(tx Tx) error { - return s.deleteNotificationRule(ctx, tx, id) - }) -} - -func (s *Service) deleteNotificationRule(ctx context.Context, tx Tx, id influxdb.ID) error { - r, err := s.findNotificationRuleByID(ctx, tx, id) - if err != nil { - return err - } - - if err := s.deleteTask(ctx, tx, r.GetTaskID()); err != nil { - return err - } - - encodedID, err := id.Encode() - if err != nil { - return ErrInvalidNotificationRuleID - } - - bucket, err := s.notificationRuleBucket(tx) - if err != nil { - return err - } - - _, err = bucket.Get(encodedID) - if IsNotFound(err) { - return ErrNotificationRuleNotFound - } - if err != nil { - return InternalNotificationRuleStoreError(err) - } - - if err := bucket.Delete(encodedID); err != nil { - return InternalNotificationRuleStoreError(err) - } - - if err := s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{ - ResourceID: id, - ResourceType: influxdb.NotificationRuleResourceType, - }); err != nil { - // TODO(desa): it is possible that there were no user resource mappings for a resource so this likely shouldn't be a blocking - // condition for deleting a notification rule. - s.log.Info("Failed to remove user resource mappings for notification rule", zap.Error(err), zap.Stringer("rule_id", id)) - } - - return nil -} diff --git a/kv/notification_rule_test.go b/kv/notification_rule_test.go deleted file mode 100644 index fdb054ef83e..00000000000 --- a/kv/notification_rule_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package kv_test - -import ( - "context" - "testing" - - "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kv" - "github.com/influxdata/influxdb/v2/query/fluxlang" - influxdbtesting "github.com/influxdata/influxdb/v2/testing" - "go.uber.org/zap/zaptest" -) - -func TestBoltNotificationRuleStore(t *testing.T) { - influxdbtesting.NotificationRuleStore(initBoltNotificationRuleStore, t) -} - -func initBoltNotificationRuleStore(f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) { - s, closeBolt, err := NewTestBoltStore(t) - if err != nil { - t.Fatalf("failed to create new kv store: %v", err) - } - - svc, closeSvc := initNotificationRuleStore(s, f, t) - return svc, func() { - closeSvc() - closeBolt() - } -} - -func initNotificationRuleStore(s kv.SchemaStore, f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) { - ctx := context.Background() - svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{ - FluxLanguageService: fluxlang.DefaultService, - }) - svc.IDGenerator = f.IDGenerator - svc.TimeGenerator = f.TimeGenerator - if f.TimeGenerator == nil { - svc.TimeGenerator = influxdb.RealTimeGenerator{} - } - - for _, o := range f.Orgs { - if err := svc.PutOrganization(ctx, o); err != nil { - t.Fatalf("failed to populate org: %v", err) - } - } - - for _, m := range f.UserResourceMappings { - if err := svc.CreateUserResourceMapping(ctx, m); err != nil { - t.Fatalf("failed to populate user resource mapping: %v", err) - } - } - - for _, e := range f.Endpoints { - if err := svc.CreateNotificationEndpoint(ctx, e, 1); err != nil { - t.Fatalf("failed to populate notification endpoint: %v", err) - } - } - - for _, nr := range f.NotificationRules { - nrc := influxdb.NotificationRuleCreate{ - NotificationRule: nr, - Status: influxdb.Active, - } - if err := svc.PutNotificationRule(ctx, nrc); err != nil { - t.Fatalf("failed to populate notification rule: %v", err) - } - } - - for _, c := range f.Tasks { - if _, err := svc.CreateTask(ctx, c); err != nil { - t.Fatalf("failed to populate task: %v", err) - } - } - - return svc, func() { - for _, nr := range f.NotificationRules { - if err := svc.DeleteNotificationRule(ctx, nr.GetID()); err != nil { - t.Logf("failed to remove notification rule: %v", err) - } - } - for _, urm := range f.UserResourceMappings { - if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound { - t.Logf("failed to remove urm rule: %v", err) - } - } - for _, o := range f.Orgs { - if err := svc.DeleteOrganization(ctx, o.ID); err != nil { - t.Fatalf("failed to remove org: %v", err) - } - } - } -} diff --git a/kv/org.go b/kv/org.go index 0af68c7f6d7..ef6720e5de5 100644 --- a/kv/org.go +++ b/kv/org.go @@ -717,88 +717,6 @@ func (s *Service) appendOrganizationEventToLog(ctx context.Context, tx Tx, id in return s.addLogEntry(ctx, tx, k, v, s.Now()) } -// FindResourceOrganizationID is used to find the organization that a resource belongs to five the id of a resource and a resource type. -func (s *Service) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) { - switch rt { - case influxdb.AuthorizationsResourceType: - r, err := s.FindAuthorizationByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.BucketsResourceType: - r, err := s.FindBucketByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.OrgsResourceType: - r, err := s.FindOrganizationByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.ID, nil - case influxdb.DashboardsResourceType: - r, err := s.FindDashboardByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.SourcesResourceType: - r, err := s.FindSourceByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.TasksResourceType: - r, err := s.FindTaskByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.TelegrafsResourceType: - r, err := s.FindTelegrafConfigByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.VariablesResourceType: - r, err := s.FindVariableByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrganizationID, nil - case influxdb.ScraperResourceType: - r, err := s.GetTargetByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.OrgID, nil - case influxdb.ChecksResourceType: - r, err := s.FindCheckByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.GetOrgID(), nil - case influxdb.NotificationEndpointResourceType: - r, err := s.FindNotificationEndpointByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.GetOrgID(), nil - case influxdb.NotificationRuleResourceType: - r, err := s.FindNotificationRuleByID(ctx, id) - if err != nil { - return influxdb.InvalidID(), err - } - return r.GetOrgID(), nil - } - - return influxdb.InvalidID(), &influxdb.Error{ - Msg: fmt.Sprintf("unsupported resource type %s", rt), - } -} - // OrgAlreadyExistsError is used when creating a new organization with // a name that has already been used. Organization names must be unique. func OrgAlreadyExistsError(o *influxdb.Organization) error { diff --git a/label/middleware_auth.go b/label/middleware_auth.go index 2c117aaa035..bbb68b3fedf 100644 --- a/label/middleware_auth.go +++ b/label/middleware_auth.go @@ -2,7 +2,6 @@ package label import ( "context" - "errors" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/authorizer" @@ -11,15 +10,15 @@ import ( var _ influxdb.LabelService = (*AuthedLabelService)(nil) type AuthedLabelService struct { - s influxdb.LabelService - orgSvc authorizer.OrganizationService + s influxdb.LabelService + orgIDResolver authorizer.OrgIDResolver } // NewAuthedLabelService constructs an instance of an authorizing label serivce. -func NewAuthedLabelService(s influxdb.LabelService, orgSvc authorizer.OrganizationService) *AuthedLabelService { +func NewAuthedLabelService(s influxdb.LabelService, orgIDResolver authorizer.OrgIDResolver) *AuthedLabelService { return &AuthedLabelService{ - s: s, - orgSvc: orgSvc, + s: s, + orgIDResolver: orgIDResolver, } } func (s *AuthedLabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error { @@ -59,10 +58,7 @@ func (s *AuthedLabelService) FindResourceLabels(ctx context.Context, filter infl return nil, err } - if s.orgSvc == nil { - return nil, errors.New("failed to find orgSvc") - } - orgID, err := s.orgSvc.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) + orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID) if err != nil { return nil, err } diff --git a/notification/rule/service/service.go b/notification/rule/service/service.go new file mode 100644 index 00000000000..de4aa40d833 --- /dev/null +++ b/notification/rule/service/service.go @@ -0,0 +1,497 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/notification/rule" + "github.com/influxdata/influxdb/v2/pkg/pointer" + "github.com/influxdata/influxdb/v2/snowflake" + "go.uber.org/zap" +) + +var ( + notificationRuleBucket = []byte("notificationRulev1") + + // ErrNotificationRuleNotFound is used when the notification rule is not found. + ErrNotificationRuleNotFound = &influxdb.Error{ + Msg: "notification rule not found", + Code: influxdb.ENotFound, + } + + // ErrInvalidNotificationRuleID is used when the service was provided + // an invalid ID format. + ErrInvalidNotificationRuleID = &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "provided notification rule ID has invalid format", + } +) + +// RuleService is an implementation of the influxdb CheckService +// It is backed by the kv store abstraction. +type RuleService struct { + log *zap.Logger + + kv kv.Store + tasks influxdb.TaskService + orgs influxdb.OrganizationService + endpoints influxdb.NotificationEndpointService + + idGenerator influxdb.IDGenerator + timeGenerator influxdb.TimeGenerator +} + +// NewRuleService constructs and configures a notification rule service +func NewRuleService(logger *zap.Logger, store kv.Store, tasks influxdb.TaskService, orgs influxdb.OrganizationService, endpoints influxdb.NotificationEndpointService) (*RuleService, error) { + s := &RuleService{ + log: logger, + kv: store, + tasks: tasks, + orgs: orgs, + endpoints: endpoints, + timeGenerator: influxdb.RealTimeGenerator{}, + idGenerator: snowflake.NewIDGenerator(), + } + + ctx := context.Background() + if err := store.Update(ctx, func(tx kv.Tx) error { + return s.initializeNotificationRule(ctx, tx) + }); err != nil { + return nil, err + } + + return s, nil +} + +var _ influxdb.NotificationRuleStore = (*RuleService)(nil) + +func (s *RuleService) initializeNotificationRule(ctx context.Context, tx kv.Tx) error { + if _, err := s.notificationRuleBucket(tx); err != nil { + return err + } + return nil +} + +// UnavailableNotificationRuleStoreError is used if we aren't able to interact with the +// store, it means the store is not available at the moment (e.g. network). +func UnavailableNotificationRuleStoreError(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unable to connect to notification rule store service. Please try again; Err: %v", err), + Op: "kv/notificationRule", + } +} + +// InternalNotificationRuleStoreError is used when the error comes from an +// internal system. +func InternalNotificationRuleStoreError(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unknown internal notificationRule data error; Err: %v", err), + Op: "kv/notificationRule", + } +} + +func (s *RuleService) notificationRuleBucket(tx kv.Tx) (kv.Bucket, error) { + b, err := tx.Bucket(notificationRuleBucket) + if err != nil { + return nil, UnavailableNotificationRuleStoreError(err) + } + return b, nil +} + +// CreateNotificationRule creates a new notification rule and sets b.ID with the new identifier. +func (s *RuleService) CreateNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { + // set notification rule ID + id := s.idGenerator.ID() + nr.SetID(id) + + // set notification rule created / updated times + now := s.timeGenerator.Now() + nr.SetOwnerID(userID) + nr.SetCreatedAt(now) + nr.SetUpdatedAt(now) + + // create backing task and set ID (in inactive state initially) + t, err := s.createNotificationTask(ctx, nr) + if err != nil { + return err + } + + nr.SetTaskID(t.ID) + + if err := s.kv.Update(ctx, func(tx kv.Tx) error { + return s.createNotificationRule(ctx, tx, nr, userID) + }); err != nil { + // remove associated task + if derr := s.tasks.DeleteTask(ctx, t.ID); derr != nil { + s.log.Error("failed to remove task for invalid notification rule", zap.Error(derr)) + } + + return err + } + + // set task to notification rule create status + _, err = s.tasks.UpdateTask(ctx, t.ID, influxdb.TaskUpdate{Status: pointer.String(string(nr.Status))}) + return err +} + +func (s *RuleService) createNotificationRule(ctx context.Context, tx kv.Tx, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error { + if err := nr.Valid(); err != nil { + return err + } + + if err := nr.Status.Valid(); err != nil { + return err + } + + return s.putNotificationRule(ctx, tx, nr.NotificationRule) +} + +func (s *RuleService) createNotificationTask(ctx context.Context, r influxdb.NotificationRuleCreate) (*influxdb.Task, error) { + ep, err := s.endpoints.FindNotificationEndpointByID(ctx, r.GetEndpointID()) + if err != nil { + return nil, err + } + + script, err := r.GenerateFlux(ep) + if err != nil { + return nil, err + } + + tc := influxdb.TaskCreate{ + Type: r.Type(), + Flux: script, + OwnerID: r.GetOwnerID(), + OrganizationID: r.GetOrgID(), + // create task initially in inactive status + Status: string(influxdb.Inactive), + } + + t, err := s.tasks.CreateTask(ctx, tc) + if err != nil { + return nil, err + } + + return t, nil +} + +// UpdateNotificationRule updates a single notification rule. +// Returns the new notification rule after update. +func (s *RuleService) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) { + rule, err := s.FindNotificationRuleByID(ctx, id) + if err != nil { + return nil, err + } + + // ID and OrganizationID can not be updated + nr.SetID(rule.GetID()) + nr.SetOrgID(rule.GetOrgID()) + nr.SetOwnerID(rule.GetOwnerID()) + nr.SetCreatedAt(rule.GetCRUDLog().CreatedAt) + nr.SetUpdatedAt(s.timeGenerator.Now()) + nr.SetTaskID(rule.GetTaskID()) + + if err := nr.Valid(); err != nil { + return nil, err + } + + if err := nr.Status.Valid(); err != nil { + return nil, err + } + + _, err = s.updateNotificationTask(ctx, nr, pointer.String(string(nr.Status))) + if err != nil { + return nil, err + } + + err = s.kv.Update(ctx, func(tx kv.Tx) error { + return s.putNotificationRule(ctx, tx, nr.NotificationRule) + }) + + return nr.NotificationRule, err +} + +func (s *RuleService) updateNotificationTask(ctx context.Context, r influxdb.NotificationRule, status *string) (*influxdb.Task, error) { + ep, err := s.endpoints.FindNotificationEndpointByID(ctx, r.GetEndpointID()) + if err != nil { + return nil, err + } + + script, err := r.GenerateFlux(ep) + if err != nil { + return nil, err + } + + tu := influxdb.TaskUpdate{ + Flux: &script, + Description: pointer.String(r.GetDescription()), + Status: status, + } + + t, err := s.tasks.UpdateTask(ctx, r.GetTaskID(), tu) + if err != nil { + return nil, err + } + + return t, nil +} + +// PatchNotificationRule updates a single notification rule with changeset. +// Returns the new notification rule state after update. +func (s *RuleService) PatchNotificationRule(ctx context.Context, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { + nr, err := s.FindNotificationRuleByID(ctx, id) + if err != nil { + return nil, err + } + + if upd.Name != nil { + nr.SetName(*upd.Name) + } + if upd.Description != nil { + nr.SetDescription(*upd.Description) + } + + var status *string + if upd.Status != nil { + status = pointer.String(string(*upd.Status)) + } + + nr.SetUpdatedAt(s.timeGenerator.Now()) + if err := nr.Valid(); err != nil { + return nil, err + } + + _, err = s.updateNotificationTask(ctx, nr, status) + if err != nil { + return nil, err + } + + if err := s.kv.Update(ctx, func(tx kv.Tx) (err error) { + return s.putNotificationRule(ctx, tx, nr) + }); err != nil { + return nil, err + } + + return nr, nil +} + +// PutNotificationRule put a notification rule to storage. +func (s *RuleService) PutNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate) error { + return s.kv.Update(ctx, func(tx kv.Tx) (err error) { + if err := nr.Valid(); err != nil { + return err + } + + if err := nr.Status.Valid(); err != nil { + return err + } + + return s.putNotificationRule(ctx, tx, nr) + }) +} + +func (s *RuleService) putNotificationRule(ctx context.Context, tx kv.Tx, nr influxdb.NotificationRule) error { + encodedID, _ := nr.GetID().Encode() + + v, err := json.Marshal(nr) + if err != nil { + return err + } + + bucket, err := s.notificationRuleBucket(tx) + if err != nil { + return err + } + + if err := bucket.Put(encodedID, v); err != nil { + return UnavailableNotificationRuleStoreError(err) + } + return nil +} + +// FindNotificationRuleByID returns a single notification rule by ID. +func (s *RuleService) FindNotificationRuleByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationRule, error) { + var ( + nr influxdb.NotificationRule + err error + ) + + err = s.kv.View(ctx, func(tx kv.Tx) error { + nr, err = s.findNotificationRuleByID(ctx, tx, id) + return err + }) + + return nr, err +} + +func (s *RuleService) findNotificationRuleByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (influxdb.NotificationRule, error) { + encID, err := id.Encode() + if err != nil { + return nil, ErrInvalidNotificationRuleID + } + + bucket, err := s.notificationRuleBucket(tx) + if err != nil { + return nil, err + } + + v, err := bucket.Get(encID) + if kv.IsNotFound(err) { + return nil, ErrNotificationRuleNotFound + } + if err != nil { + return nil, InternalNotificationRuleStoreError(err) + } + + return rule.UnmarshalJSON(v) +} + +// FindNotificationRules returns a list of notification rules that match filter and the total count of matching notification rules. +// Additional options provide pagination & sorting. +func (s *RuleService) FindNotificationRules(ctx context.Context, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) (nrs []influxdb.NotificationRule, n int, err error) { + if filter.OrgID == nil && filter.Organization != nil { + o, err := s.orgs.FindOrganization(ctx, influxdb.OrganizationFilter{ + Name: filter.Organization, + }) + + if err != nil { + return nrs, 0, err + } + + filter.OrgID = &o.ID + } + + err = s.kv.View(ctx, func(tx kv.Tx) error { + nrs, n, err = s.findNotificationRules(ctx, tx, filter, opt...) + return err + }) + + return nrs, n, err +} + +func (s *RuleService) findNotificationRules(ctx context.Context, tx kv.Tx, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationRule, int, error) { + var ( + nrs = make([]influxdb.NotificationRule, 0) + offset int + limit int + count int + descending bool + ) + + if len(opt) > 0 { + offset = opt[0].Offset + limit = opt[0].Limit + descending = opt[0].Descending + } + + filterFn := filterNotificationRulesFn(filter) + err := s.forEachNotificationRule(ctx, tx, descending, func(nr influxdb.NotificationRule) bool { + if filterFn(nr) { + if count >= offset { + nrs = append(nrs, nr) + } + count++ + } + + if limit > 0 && len(nrs) >= limit { + return false + } + + return true + }) + + return nrs, len(nrs), err +} + +// forEachNotificationRule will iterate through all notification rules while fn returns true. +func (s *RuleService) forEachNotificationRule(ctx context.Context, tx kv.Tx, descending bool, fn func(influxdb.NotificationRule) bool) error { + + bkt, err := s.notificationRuleBucket(tx) + if err != nil { + return err + } + + direction := kv.CursorAscending + if descending { + direction = kv.CursorDescending + } + + cur, err := bkt.ForwardCursor(nil, kv.WithCursorDirection(direction)) + if err != nil { + return err + } + + for k, v := cur.Next(); k != nil; k, v = cur.Next() { + nr, err := rule.UnmarshalJSON(v) + if err != nil { + return err + } + if !fn(nr) { + break + } + } + + return nil +} + +func filterNotificationRulesFn(filter influxdb.NotificationRuleFilter) func(nr influxdb.NotificationRule) bool { + if filter.OrgID != nil { + return func(nr influxdb.NotificationRule) bool { + if !nr.MatchesTags(filter.Tags) { + return false + } + + return nr.GetOrgID() == *filter.OrgID + } + } + + return func(nr influxdb.NotificationRule) bool { + return nr.MatchesTags(filter.Tags) + } +} + +// DeleteNotificationRule removes a notification rule by ID. +func (s *RuleService) DeleteNotificationRule(ctx context.Context, id influxdb.ID) error { + r, err := s.FindNotificationRuleByID(ctx, id) + if err != nil { + return err + } + + if err := s.tasks.DeleteTask(ctx, r.GetTaskID()); err != nil { + return err + } + + return s.kv.Update(ctx, func(tx kv.Tx) error { + return s.deleteNotificationRule(ctx, tx, r) + }) +} + +func (s *RuleService) deleteNotificationRule(ctx context.Context, tx kv.Tx, r influxdb.NotificationRule) error { + encodedID, err := r.GetID().Encode() + if err != nil { + return ErrInvalidNotificationRuleID + } + + bucket, err := s.notificationRuleBucket(tx) + if err != nil { + return err + } + + _, err = bucket.Get(encodedID) + if kv.IsNotFound(err) { + return ErrNotificationRuleNotFound + } + if err != nil { + return InternalNotificationRuleStoreError(err) + } + + if err := bucket.Delete(encodedID); err != nil { + return InternalNotificationRuleStoreError(err) + } + + return nil +} diff --git a/testing/notification_rule.go b/notification/rule/service/service_external_test.go similarity index 66% rename from testing/notification_rule.go rename to notification/rule/service/service_external_test.go index c4db726efae..97f0c1d050c 100644 --- a/testing/notification_rule.go +++ b/notification/rule/service/service_external_test.go @@ -1,28 +1,49 @@ -package testing +package service import ( + "bytes" "context" + "reflect" "sort" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/influxdata/influxdb/v2" + "github.com/influxdata/flux/ast" + influxdb "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/notification" "github.com/influxdata/influxdb/v2/notification/endpoint" "github.com/influxdata/influxdb/v2/notification/rule" + "github.com/influxdata/influxdb/v2/pkg/pointer" +) + +const ( + oneID = "020f755c3c082000" + twoID = "020f755c3c082001" + threeID = "020f755c3c082002" + fourID = "020f755c3c082003" + fiveID = "020f755c3c082004" + sixID = "020f755c3c082005" +) + +var ( + fakeDate = time.Date(2006, 5, 4, 1, 2, 3, 0, time.UTC) + fakeGenerator = mock.TimeGenerator{FakeValue: fakeDate} + timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)} + timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)} + time3 = time.Date(2006, time.July, 15, 5, 23, 53, 10, time.UTC) ) // NotificationRuleFields includes prepopulated data for mapping tests. type NotificationRuleFields struct { - IDGenerator influxdb.IDGenerator - TimeGenerator influxdb.TimeGenerator - NotificationRules []influxdb.NotificationRule - Orgs []*influxdb.Organization - UserResourceMappings []*influxdb.UserResourceMapping - Tasks []influxdb.TaskCreate - Endpoints []influxdb.NotificationEndpoint + IDGenerator influxdb.IDGenerator + TimeGenerator influxdb.TimeGenerator + NotificationRules []influxdb.NotificationRule + Orgs []*influxdb.Organization + Tasks []influxdb.TaskCreate + Endpoints []influxdb.NotificationEndpoint } var notificationRuleCmpOptions = cmp.Options{ @@ -36,14 +57,36 @@ var notificationRuleCmpOptions = cmp.Options{ }), } +var taskCmpOptions = cmp.Options{ + cmp.Comparer(func(x, y []byte) bool { + return bytes.Equal(x, y) + }), + // skip comparing permissions + cmpopts.IgnoreFields( + influxdb.Task{}, + "LatestCompleted", + "LatestScheduled", + "CreatedAt", + "UpdatedAt", + ), + cmp.Transformer("Sort", func(in []*influxdb.Task) []*influxdb.Task { + out := append([]*influxdb.Task{}, in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID > out[j].ID + }) + return out + }), +} + +type notificationRuleFactory func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) + // NotificationRuleStore tests all the service functions. func NotificationRuleStore( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), t *testing.T, + init notificationRuleFactory, t *testing.T, ) { tests := []struct { name string - fn func(init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), - t *testing.T) + fn func(notificationRuleFactory, *testing.T) }{ { name: "CreateNotificationRule", @@ -72,8 +115,6 @@ func NotificationRuleStore( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt := tt - t.Parallel() tt.fn(init, t) }) } @@ -81,7 +122,7 @@ func NotificationRuleStore( // CreateNotificationRule testing. func CreateNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -89,9 +130,9 @@ func CreateNotificationRule( userID influxdb.ID } type wants struct { - err error - notificationRules []influxdb.NotificationRule - userResourceMapping []*influxdb.UserResourceMapping + err error + notificationRule influxdb.NotificationRule + task *influxdb.Task } tests := []struct { @@ -117,7 +158,7 @@ func CreateNotificationRule( Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -167,14 +208,6 @@ func CreateNotificationRule( MessageTemplate: "msg1", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - }, }, args: args{ userID: MustIDBase16(sixID), @@ -213,7 +246,84 @@ func CreateNotificationRule( }, }, wants: wants{ - notificationRules: []influxdb.NotificationRule{ + notificationRule: &rule.Slack{ + Base: rule.Base{ + ID: MustIDBase16(twoID), + Name: "name2", + OwnerID: MustIDBase16(sixID), + OrgID: MustIDBase16(fourID), + EndpointID: MustIDBase16(twoID), + RunbookLink: "runbooklink1", + SleepUntil: &time3, + Every: mustDuration("1h"), + StatusRules: []notification.StatusRule{ + { + CurrentLevel: notification.Critical, + }, + }, + TagRules: []notification.TagRule{ + { + Tag: influxdb.Tag{ + Key: "k1", + Value: "v1", + }, + Operator: influxdb.NotEqual, + }, + { + Tag: influxdb.Tag{ + Key: "k2", + Value: "v2", + }, + Operator: influxdb.RegexEqual, + }, + }, + CRUDLog: influxdb.CRUDLog{ + CreatedAt: fakeDate, + UpdatedAt: fakeDate, + }, + }, + MessageTemplate: "msg1", + }, + task: &influxdb.Task{ + ID: MustIDBase16("020f755c3c082001"), + Type: "slack", + OrganizationID: MustIDBase16("020f755c3c082003"), + Organization: "org", + OwnerID: MustIDBase16("020f755c3c082005"), + Name: "name2", + Status: "active", + Flux: "package main\n// name2\nimport \"influxdata/influxdb/monitor\"\nimport \"slack\"\nimport \"influxdata/influxdb/secrets\"\nimport \"experimental\"\n\noption task = {name: \"name2\", every: 1h}\n\nslack_secret = secrets[\"get\"](key: \"020f755c3c082001-token\")\nslack_endpoint = slack[\"endpoint\"](token: slack_secret, url: \"http://localhost:7777\")\nnotification = {\n\t_notification_rule_id: \"020f755c3c082001\",\n\t_notification_rule_name: \"name2\",\n\t_notification_endpoint_id: \"020f755c3c082001\",\n\t_notification_endpoint_name: \"foo\",\n}\nstatuses = monitor[\"from\"](start: -2h, fn: (r) =>\n\t(r[\"k1\"] == \"v1\" and r[\"k2\"] == \"v2\"))\ncrit = statuses\n\t|> filter(fn: (r) =>\n\t\t(r[\"_level\"] == \"crit\"))\nall_statuses = crit\n\t|> filter(fn: (r) =>\n\t\t(r[\"_time\"] >= experimental[\"subDuration\"](from: now(), d: 1h)))\n\nall_statuses\n\t|> monitor[\"notify\"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>\n\t\t({channel: \"\", text: \"msg1\", color: if r[\"_level\"] == \"crit\" then \"danger\" else if r[\"_level\"] == \"warn\" then \"warning\" else \"good\"})))", + Every: "1h", + }, + }, + }, + { + name: "invalid tag rule value", + fields: NotificationRuleFields{ + IDGenerator: mock.NewIDGenerator(twoID, t), + TimeGenerator: fakeGenerator, + Orgs: []*influxdb.Organization{ + { + Name: "org", + ID: MustIDBase16(fourID), + }, + }, + Endpoints: []influxdb.NotificationEndpoint{ + &endpoint.Slack{ + URL: "http://localhost:7777", + Token: influxdb.SecretField{ + // TODO(desa): not sure why this has to end in token, but it does + Key: "020f755c3c082001-token", + Value: pointer.String("abc123"), + }, + Base: endpoint.Base{ + OrgID: MustIDBase16Ptr(fourID), + Name: "foo", + Status: influxdb.Active, + }, + }, + }, + NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ ID: MustIDBase16(oneID), @@ -253,58 +363,49 @@ func CreateNotificationRule( Channel: "channel1", MessageTemplate: "msg1", }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(twoID), - Name: "name2", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink1", - SleepUntil: &time3, - Every: mustDuration("1h"), - StatusRules: []notification.StatusRule{ - { - CurrentLevel: notification.Critical, - }, + }, + }, + args: args{ + userID: MustIDBase16(sixID), + notificationRule: &rule.Slack{ + Base: rule.Base{ + OwnerID: MustIDBase16(sixID), + OrgID: MustIDBase16(fourID), + Name: "name2", + EndpointID: MustIDBase16(twoID), + RunbookLink: "runbooklink1", + SleepUntil: &time3, + Every: mustDuration("1h"), + StatusRules: []notification.StatusRule{ + { + CurrentLevel: notification.Critical, }, - TagRules: []notification.TagRule{ - { - Tag: influxdb.Tag{ - Key: "k1", - Value: "v1", - }, - Operator: influxdb.NotEqual, - }, - { - Tag: influxdb.Tag{ - Key: "k2", - Value: "v2", - }, - Operator: influxdb.RegexEqual, + }, + TagRules: []notification.TagRule{ + { + Tag: influxdb.Tag{ + Key: "k1", + Value: "v1", }, + Operator: influxdb.NotEqual, }, - CRUDLog: influxdb.CRUDLog{ - CreatedAt: fakeDate, - UpdatedAt: fakeDate, + { + Tag: influxdb.Tag{ + Key: "k2", + // empty tag value to trigger validation error + Value: "", + }, + Operator: influxdb.RegexEqual, }, }, - MessageTemplate: "msg1", }, + MessageTemplate: "msg1", }, - userResourceMapping: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, + }, + wants: wants{ + err: &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "tag must contain a key and a value", }, }, }, @@ -312,7 +413,7 @@ func CreateNotificationRule( for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, tasks, done := init(tt.fields, t) defer done() ctx := context.Background() nrc := influxdb.NotificationRuleCreate{ @@ -320,33 +421,54 @@ func CreateNotificationRule( Status: influxdb.Active, } err := s.CreateNotificationRule(ctx, nrc, tt.args.userID) - if (err != nil) != (tt.wants.err != nil) { - t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err) - } - if tt.wants.err == nil && !tt.args.notificationRule.GetID().Valid() { - t.Fatalf("notification rule ID not set from CreateNotificationRule") - } + if tt.wants.err != nil { + // expected error case + if !reflect.DeepEqual(tt.wants.err, err) { + t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err) + } - if err != nil && tt.wants.err != nil { - if influxdb.ErrorCode(err) != influxdb.ErrorCode(tt.wants.err) { - t.Fatalf("expected error messages to match '%v' got '%v'", influxdb.ErrorCode(tt.wants.err), influxdb.ErrorCode(err)) + // ensure no rules can be located + _, n, err := s.FindNotificationRules(ctx, influxdb.NotificationRuleFilter{}) + if err != nil { + t.Fatal(err) } - } - urmFilter := influxdb.UserResourceMappingFilter{ - UserID: tt.args.userID, - ResourceType: influxdb.NotificationRuleResourceType, + if existing := len(tt.fields.NotificationRules); n > existing { + t.Errorf("expected no rules to be created, found %d", n-existing) + } + } else { + nr, err := s.FindNotificationRuleByID(ctx, tt.args.notificationRule.GetID()) + if err != nil { + t.Errorf("failed to retrieve notification rules: %v", err) + } + + if diff := cmp.Diff(nr, tt.wants.notificationRule, notificationRuleCmpOptions...); diff != "" { + t.Errorf("notificationRules are different -got/+want\ndiff %s", diff) + } } - filter := influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: urmFilter, + if tt.wants.task == nil || !tt.wants.task.ID.Valid() { + // if not tasks or a task with an invalid ID is provided (0) then assume + // no tasks should be persisted + _, n, err := tasks.FindTasks(ctx, influxdb.TaskFilter{}) + if err != nil { + t.Fatal(err) + } + + if n > 0 { + t.Errorf("expected zero tasks to be created, instead found %d", n) + } + + return } - nrs, _, err := s.FindNotificationRules(ctx, filter) + + task, err := tasks.FindTaskByID(ctx, tt.wants.task.ID) if err != nil { - t.Fatalf("failed to retrieve notification rules: %v", err) + t.Fatal(err) } - if diff := cmp.Diff(nrs, tt.wants.notificationRules, notificationRuleCmpOptions...); diff != "" { - t.Errorf("notificationRules are different -got/+want\ndiff %s", diff) + + if diff := cmp.Diff(task, tt.wants.task, taskCmpOptions...); diff != "" { + t.Errorf("task is different -got/+want\ndiff %s", diff) } }) } @@ -354,7 +476,7 @@ func CreateNotificationRule( // FindNotificationRuleByID testing. func FindNotificationRuleByID( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -374,20 +496,6 @@ func FindNotificationRuleByID( { name: "bad id", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -439,20 +547,6 @@ func FindNotificationRuleByID( { name: "not found", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -504,20 +598,6 @@ func FindNotificationRuleByID( { name: "basic find telegraf config by id", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -582,7 +662,7 @@ func FindNotificationRuleByID( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -597,7 +677,7 @@ func FindNotificationRuleByID( // FindNotificationRules testing func FindNotificationRules( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -618,15 +698,10 @@ func FindNotificationRules( { name: "find nothing (empty set)", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{}, - NotificationRules: []influxdb.NotificationRule{}, + NotificationRules: []influxdb.NotificationRule{}, }, args: args{ - filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, + filter: influxdb.NotificationRuleFilter{}, }, wants: wants{ notificationRules: []influxdb.NotificationRule{}, @@ -635,20 +710,6 @@ func FindNotificationRules( { name: "find all notification rules", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -688,12 +749,7 @@ func FindNotificationRules( }, }, args: args{ - filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(sixID), - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, + filter: influxdb.NotificationRuleFilter{}, }, wants: wants{ notificationRules: []influxdb.NotificationRule{ @@ -735,93 +791,6 @@ func FindNotificationRules( }, }, }, - { - name: "find owners only", - fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, - NotificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(oneID), - Name: "name1", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink1", - SleepUntil: &time3, - Every: mustDuration("1h"), - CRUDLog: influxdb.CRUDLog{ - CreatedAt: timeGen1.Now(), - UpdatedAt: timeGen2.Now(), - }, - }, - Channel: "channel1", - MessageTemplate: "msg1", - }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(twoID), - Name: "name2", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink2", - SleepUntil: &time3, - Every: mustDuration("1h"), - CRUDLog: influxdb.CRUDLog{ - CreatedAt: timeGen1.Now(), - UpdatedAt: timeGen2.Now(), - }, - }, - MessageTemplate: "msg", - }, - }, - }, - args: args{ - filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(sixID), - ResourceType: influxdb.NotificationRuleResourceType, - UserType: influxdb.Owner, - }, - }, - }, - wants: wants{ - notificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(oneID), - Name: "name1", - OwnerID: MustIDBase16(sixID), - OrgID: MustIDBase16(fourID), - EndpointID: MustIDBase16(twoID), - RunbookLink: "runbooklink1", - SleepUntil: &time3, - Every: mustDuration("1h"), - CRUDLog: influxdb.CRUDLog{ - CreatedAt: timeGen1.Now(), - UpdatedAt: timeGen2.Now(), - }, - }, - Channel: "channel1", - MessageTemplate: "msg1", - }, - }, - }, - }, { name: "filter by organization id only", fields: NotificationRuleFields{ @@ -835,26 +804,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -891,7 +840,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - OrgID: idPtr(MustIDBase16(oneID)), + OrgID: MustIDBase16Ptr(oneID), }, }, wants: wants{ @@ -922,26 +871,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -978,7 +907,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - Organization: strPtr("org4"), + Organization: pointer.String("org4"), }, }, wants: wants{ @@ -1007,256 +936,6 @@ func FindNotificationRules( }, }, }, - // { - // name: "filter by tags", - // fields: NotificationRuleFields{ - // Orgs: []*influxdb.Organization{ - // { - // ID: MustIDBase16(oneID), - // Name: "org1", - // }, - // { - // ID: MustIDBase16(fourID), - // Name: "org4", - // }, - // }, - // UserResourceMappings: []*influxdb.UserResourceMapping{ - // { - // ResourceID: MustIDBase16(oneID), - // ResourceType: influxdb.NotificationRuleResourceType, - // UserID: MustIDBase16(sixID), - // UserType: influxdb.Owner, - // }, - // { - // ResourceID: MustIDBase16(twoID), - // ResourceType: influxdb.NotificationRuleResourceType, - // UserID: MustIDBase16(sixID), - // UserType: influxdb.Member, - // }, - // { - // ResourceID: MustIDBase16(fourID), - // ResourceType: influxdb.NotificationRuleResourceType, - // UserID: MustIDBase16(sixID), - // UserType: influxdb.Owner, - // }, - // }, - // NotificationRules: []influxdb.NotificationRule{ - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(oneID), - // OrgID: MustIDBase16(fourID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr1", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // Channel: "ch1", - // MessageTemplate: "msg1", - // }, - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(twoID), - // OrgID: MustIDBase16(fourID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr2", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // { - // Tag: influxdb.Tag{ - // Key: "location", - // Value: "paris", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // MessageTemplate: "body2", - // }, - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(fourID), - // OrgID: MustIDBase16(oneID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr3", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // { - // Tag: influxdb.Tag{ - // Key: "location", - // Value: "paris", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // MessageTemplate: "msg", - // }, - // }, - // }, - // args: args{ - // filter: influxdb.NotificationRuleFilter{ - // Organization: strPtr("org4"), - // Tags: []influxdb.Tag{ - // { - // Key: "environment", - // Value: "production", - // }, - // { - // Key: "location", - // Value: "paris", - // }, - // }, - // }, - // }, - // wants: wants{ - // notificationRules: []influxdb.NotificationRule{ - // &rule.Slack{ - // Base: rule.Base{ - // ID: MustIDBase16(fourID), - // OrgID: MustIDBase16(oneID), - // OwnerID: MustIDBase16(sixID), - // EndpointID: 1, - // Status: influxdb.Active, - // Name: "nr3", - // TagRules: []influxdb.TagRule{ - // { - // Tag: influxdb.Tag{ - // Key: "environment", - // Value: "production", - // }, - // Operator: notification.Equal, - // }, - // { - // Tag: influxdb.Tag{ - // Key: "location", - // Value: "paris", - // }, - // Operator: notification.Equal, - // }, - // }, - // }, - // MessageTemplate: "msg", - // }, - // }, - // }, - // }, - { - name: "find owners and restrict by organization", - fields: NotificationRuleFields{ - Orgs: []*influxdb.Organization{ - { - ID: MustIDBase16(oneID), - Name: "org1", - }, - { - ID: MustIDBase16(fourID), - Name: "org4", - }, - }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, - NotificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(oneID), - OrgID: MustIDBase16(fourID), - EndpointID: 1, - OwnerID: MustIDBase16(sixID), - Name: "nr1", - }, - Channel: "ch1", - MessageTemplate: "msg1", - }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(twoID), - OrgID: MustIDBase16(fourID), - OwnerID: MustIDBase16(sixID), - EndpointID: 1, - Name: "nr2", - }, - MessageTemplate: "body2", - }, - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(fourID), - OrgID: MustIDBase16(oneID), - OwnerID: MustIDBase16(sixID), - EndpointID: 1, - Name: "nr3", - }, - MessageTemplate: "msg", - }, - }, - }, - args: args{ - filter: influxdb.NotificationRuleFilter{ - OrgID: idPtr(MustIDBase16(oneID)), - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(sixID), - ResourceType: influxdb.NotificationRuleResourceType, - UserType: influxdb.Owner, - }, - }, - }, - wants: wants{ - notificationRules: []influxdb.NotificationRule{ - &rule.Slack{ - Base: rule.Base{ - ID: MustIDBase16(fourID), - OrgID: MustIDBase16(oneID), - OwnerID: MustIDBase16(sixID), - EndpointID: 1, - Name: "nr3", - }, - MessageTemplate: "msg", - }, - }, - }, - }, { name: "look for organization not bound to any notification rule", fields: NotificationRuleFields{ @@ -1270,26 +949,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1326,7 +985,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - OrgID: idPtr(MustIDBase16(oneID)), + OrgID: MustIDBase16Ptr(oneID), }, }, wants: wants{ @@ -1346,26 +1005,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1447,26 +1086,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1547,26 +1166,6 @@ func FindNotificationRules( Name: "org4", }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - { - ResourceID: MustIDBase16(twoID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - }, - { - ResourceID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1603,10 +1202,7 @@ func FindNotificationRules( }, args: args{ filter: influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: MustIDBase16(fourID), - ResourceType: influxdb.NotificationRuleResourceType, - }, + OrgID: MustIDBase16Ptr(threeID), }, }, wants: wants{}, @@ -1615,7 +1211,7 @@ func FindNotificationRules( for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -1634,7 +1230,7 @@ func FindNotificationRules( // UpdateNotificationRule testing. func UpdateNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { @@ -1657,20 +1253,6 @@ func UpdateNotificationRule( name: "can't find the id", fields: NotificationRuleFields{ TimeGenerator: fakeGenerator, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -1738,20 +1320,6 @@ func UpdateNotificationRule( fields: NotificationRuleFields{ TimeGenerator: fakeGenerator, IDGenerator: mock.NewIDGenerator(twoID, t), - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Tasks: []influxdb.TaskCreate{ { OwnerID: MustIDBase16(sixID), @@ -1767,7 +1335,7 @@ func UpdateNotificationRule( Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -1882,7 +1450,7 @@ func UpdateNotificationRule( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -1903,7 +1471,7 @@ func UpdateNotificationRule( // PatchNotificationRule testing. func PatchNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { @@ -1930,20 +1498,6 @@ func PatchNotificationRule( name: "can't find the id", fields: NotificationRuleFields{ TimeGenerator: fakeGenerator, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2010,27 +1564,13 @@ func PatchNotificationRule( `, }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Endpoints: []influxdb.NotificationEndpoint{ &endpoint.Slack{ URL: "http://localhost:7777", Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -2149,27 +1689,13 @@ func PatchNotificationRule( `, }, }, - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Endpoints: []influxdb.NotificationEndpoint{ &endpoint.Slack{ URL: "http://localhost:7777", Token: influxdb.SecretField{ // TODO(desa): not sure why this has to end in token, but it does Key: "020f755c3c082001-token", - Value: strPtr("abc123"), + Value: pointer.String("abc123"), }, Base: endpoint.Base{ OrgID: MustIDBase16Ptr(fourID), @@ -2278,7 +1804,7 @@ func PatchNotificationRule( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() @@ -2293,18 +1819,17 @@ func PatchNotificationRule( // DeleteNotificationRule testing. func DeleteNotificationRule( - init func(NotificationRuleFields, *testing.T) (influxdb.NotificationRuleStore, func()), + init notificationRuleFactory, t *testing.T, ) { type args struct { - id influxdb.ID - userID influxdb.ID + id influxdb.ID + orgID influxdb.ID } type wants struct { - notificationRules []influxdb.NotificationRule - userResourceMappings []*influxdb.UserResourceMapping - err error + notificationRules []influxdb.NotificationRule + err error } tests := []struct { name string @@ -2315,20 +1840,6 @@ func DeleteNotificationRule( { name: "bad id", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, NotificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2368,28 +1879,14 @@ func DeleteNotificationRule( }, }, args: args{ - id: influxdb.ID(0), - userID: MustIDBase16(sixID), + id: influxdb.ID(0), + orgID: MustIDBase16(fourID), }, wants: wants{ err: &influxdb.Error{ Code: influxdb.EInvalid, Msg: "provided notification rule ID has invalid format", }, - userResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, notificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2432,20 +1929,6 @@ func DeleteNotificationRule( { name: "none existing config", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, IDGenerator: mock.NewIDGenerator(twoID, t), Tasks: []influxdb.TaskCreate{ { @@ -2502,28 +1985,14 @@ func DeleteNotificationRule( }, }, args: args{ - id: MustIDBase16(fourID), - userID: MustIDBase16(sixID), + id: MustIDBase16(fourID), + orgID: MustIDBase16(fourID), }, wants: wants{ err: &influxdb.Error{ Code: influxdb.ENotFound, Msg: "notification rule not found", }, - userResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, notificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2566,20 +2035,6 @@ func DeleteNotificationRule( { name: "regular delete", fields: NotificationRuleFields{ - UserResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - { - ResourceID: MustIDBase16(twoID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Member, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, Tasks: []influxdb.TaskCreate{ { OwnerID: MustIDBase16(sixID), @@ -2637,18 +2092,10 @@ func DeleteNotificationRule( }, }, args: args{ - id: MustIDBase16(twoID), - userID: MustIDBase16(sixID), + id: MustIDBase16(twoID), + orgID: MustIDBase16(fourID), }, wants: wants{ - userResourceMappings: []*influxdb.UserResourceMapping{ - { - ResourceID: MustIDBase16(oneID), - UserID: MustIDBase16(sixID), - UserType: influxdb.Owner, - ResourceType: influxdb.NotificationRuleResourceType, - }, - }, notificationRules: []influxdb.NotificationRule{ &rule.Slack{ Base: rule.Base{ @@ -2675,17 +2122,14 @@ func DeleteNotificationRule( } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, done := init(tt.fields, t) + s, _, done := init(tt.fields, t) defer done() ctx := context.Background() err := s.DeleteNotificationRule(ctx, tt.args.id) ErrorsEqual(t, err, tt.wants.err) filter := influxdb.NotificationRuleFilter{ - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ - UserID: tt.args.userID, - ResourceType: influxdb.NotificationRuleResourceType, - }, + OrgID: &tt.args.orgID, } nrs, n, err := s.FindNotificationRules(ctx, filter) if err != nil && tt.wants.err == nil { @@ -2707,3 +2151,70 @@ func DeleteNotificationRule( }) } } + +// MustIDBase16 is an helper to ensure a correct ID is built during testing. +func MustIDBase16(s string) influxdb.ID { + id, err := influxdb.IDFromString(s) + if err != nil { + panic(err) + } + return *id +} + +// MustIDBase16Ptr is an helper to ensure a correct *ID is built during testing. +func MustIDBase16Ptr(s string) *influxdb.ID { + id := MustIDBase16(s) + return &id +} + +// ErrorsEqual checks to see if the provided errors are equivalent. +func ErrorsEqual(t *testing.T, actual, expected error) { + t.Helper() + if expected == nil && actual == nil { + return + } + + if expected == nil && actual != nil { + t.Errorf("unexpected error %s", actual.Error()) + } + + if expected != nil && actual == nil { + t.Errorf("expected error %s but received nil", expected.Error()) + } + + if influxdb.ErrorCode(expected) != influxdb.ErrorCode(actual) { + t.Logf("\nexpected: %v\nactual: %v\n\n", expected, actual) + t.Errorf("expected error code %q but received %q", influxdb.ErrorCode(expected), influxdb.ErrorCode(actual)) + } + + if influxdb.ErrorMessage(expected) != influxdb.ErrorMessage(actual) { + t.Logf("\nexpected: %v\nactual: %v\n\n", expected, actual) + t.Errorf("expected error message %q but received %q", influxdb.ErrorMessage(expected), influxdb.ErrorMessage(actual)) + } +} + +func idPtr(id influxdb.ID) *influxdb.ID { + return &id +} + +func mustDuration(d string) *notification.Duration { + dur, err := time.ParseDuration(d) + if err != nil { + panic(err) + } + + ndur, err := notification.FromTimeDuration(dur) + if err != nil { + panic(err) + } + + // Filter out the zero values from the duration. + durs := make([]ast.Duration, 0, len(ndur.Values)) + for _, d := range ndur.Values { + if d.Magnitude != 0 { + durs = append(durs, d) + } + } + ndur.Values = durs + return &ndur +} diff --git a/notification/rule/service/service_test.go b/notification/rule/service/service_test.go new file mode 100644 index 00000000000..e0395222eff --- /dev/null +++ b/notification/rule/service/service_test.go @@ -0,0 +1,162 @@ +package service + +import ( + "context" + "errors" + "io/ioutil" + "os" + "testing" + + influxdb "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/bolt" + "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/kv/migration/all" + "github.com/influxdata/influxdb/v2/mock" + _ "github.com/influxdata/influxdb/v2/query/builtin" + "github.com/influxdata/influxdb/v2/query/fluxlang" + "github.com/influxdata/influxdb/v2/tenant" + "go.uber.org/zap/zaptest" +) + +func TestInmemNotificationRuleStore(t *testing.T) { + NotificationRuleStore(initInmemNotificationRuleStore, t) +} + +func initInmemNotificationRuleStore(f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) { + store := inmem.NewKVStore() + if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil { + t.Fatal(err) + } + + svc, tsvc, closeSvc := initNotificationRuleStore(store, f, t) + return svc, tsvc, func() { + closeSvc() + } +} + +func initBoltNotificationRuleStore(f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) { + store, closeBolt, err := newTestBoltStore(t) + if err != nil { + t.Fatal(err) + } + + svc, tsvc, closeSvc := initNotificationRuleStore(store, f, t) + return svc, tsvc, func() { + closeSvc() + closeBolt() + } +} + +func TestBoltNotificationRuleStore(t *testing.T) { + NotificationRuleStore(initBoltNotificationRuleStore, t) +} + +func initNotificationRuleStore(s kv.Store, f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) { + logger := zaptest.NewLogger(t) + kvsvc := kv.NewService(logger, s, kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) + kvsvc.IDGenerator = f.IDGenerator + kvsvc.TimeGenerator = f.TimeGenerator + if f.TimeGenerator == nil { + kvsvc.TimeGenerator = influxdb.RealTimeGenerator{} + } + + var ( + tenantStore = tenant.NewStore(s) + tenantSvc = tenant.NewService(tenantStore) + ) + + svc, err := NewRuleService(logger, s, kvsvc, tenantSvc, kvsvc) + if err != nil { + t.Fatal(err) + } + + svc.idGenerator = f.IDGenerator + if f.TimeGenerator != nil { + svc.timeGenerator = f.TimeGenerator + } + + ctx := context.Background() + for _, o := range f.Orgs { + withOrgID(tenantStore, o.ID, func() { + if err := tenantSvc.CreateOrganization(ctx, o); err != nil { + t.Fatalf("failed to populate org: %v", err) + } + }) + } + + for _, e := range f.Endpoints { + if err := kvsvc.CreateNotificationEndpoint(ctx, e, 1); err != nil { + t.Fatalf("failed to populate notification endpoint: %v", err) + } + } + + for _, nr := range f.NotificationRules { + nrc := influxdb.NotificationRuleCreate{ + NotificationRule: nr, + Status: influxdb.Active, + } + if err := svc.PutNotificationRule(ctx, nrc); err != nil { + t.Fatalf("failed to populate notification rule: %v", err) + } + } + + for _, c := range f.Tasks { + if _, err := kvsvc.CreateTask(ctx, c); err != nil { + t.Fatalf("failed to populate task: %v", err) + } + } + + return svc, kvsvc, func() { + for _, nr := range f.NotificationRules { + if err := svc.DeleteNotificationRule(ctx, nr.GetID()); err != nil { + t.Logf("failed to remove notification rule: %v", err) + } + } + for _, o := range f.Orgs { + if err := tenantSvc.DeleteOrganization(ctx, o.ID); err != nil { + t.Fatalf("failed to remove org: %v", err) + } + } + } +} + +func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) { + backup := store.OrgIDGen + defer func() { store.OrgIDGen = backup }() + + store.OrgIDGen = mock.NewStaticIDGenerator(orgID) + + fn() +} + +func newTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) { + f, err := ioutil.TempFile("", "influxdata-bolt-") + if err != nil { + return nil, nil, errors.New("unable to open temporary boltdb file") + } + f.Close() + + ctx := context.Background() + logger := zaptest.NewLogger(t) + path := f.Name() + + // skip fsync to improve test performance + s := bolt.NewKVStore(logger, path, bolt.WithNoSync) + if err := s.Open(context.Background()); err != nil { + return nil, nil, err + } + + if err := all.Up(ctx, logger, s); err != nil { + return nil, nil, err + } + + close := func() { + s.Close() + os.Remove(path) + } + + return s, close, nil +} diff --git a/pkger/service.go b/pkger/service.go index a4b874a70c2..a485b430d4a 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -3536,10 +3536,11 @@ func (r *rollbackCoordinator) runTilEnd(ctx context.Context, orgID, userID influ defer cancel() defer func() { - if recover() != nil { + if err := recover(); err != nil { r.logger.Error( "panic applying "+resource, zap.String("stack_trace", fmt.Sprintf("%+v", stack.Trace())), + zap.Reflect("panic", err), ) errStr.add(errMsg{ resource: resource, diff --git a/testing/notification_endpoint.go b/testing/notification_endpoint.go index ffd5ee865af..1f71b3ab5f6 100644 --- a/testing/notification_endpoint.go +++ b/testing/notification_endpoint.go @@ -27,7 +27,6 @@ type NotificationEndpointFields struct { var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)} var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)} -var time3 = time.Date(2006, time.July, 15, 5, 23, 53, 10, time.UTC) var notificationEndpointCmpOptions = cmp.Options{ cmp.Transformer("Sort", func(in []influxdb.NotificationEndpoint) []influxdb.NotificationEndpoint {