Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(notification): move rule service into own package #19804

Merged
merged 5 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion authorizer/authorize_find.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func AuthorizeFindChecks(ctx context.Context, rs []influxdb.Check) ([]influxdb.C
}

// AuthorizeFindUserResourceMappings takes the given items and returns only the ones that the user is authorized to read.
func AuthorizeFindUserResourceMappings(ctx context.Context, os OrganizationService, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) {
func AuthorizeFindUserResourceMappings(ctx context.Context, os OrgIDResolver, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) {
// This filters without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
rrs := rs[:0]
Expand Down
17 changes: 7 additions & 10 deletions authorizer/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package authorizer

import (
"context"
"errors"

"github.com/influxdata/influxdb/v2"
)
Expand All @@ -12,16 +11,16 @@ var _ influxdb.LabelService = (*LabelService)(nil)
// LabelService wraps a influxdb.LabelService and authorizes actions
// against it appropriately.
type LabelService struct {
s influxdb.LabelService
orgSvc OrganizationService
s influxdb.LabelService
orgIDResolver OrgIDResolver
}

// NewLabelServiceWithOrg constructs an instance of an authorizing label serivce.
// Replaces NewLabelService.
func NewLabelServiceWithOrg(s influxdb.LabelService, orgSvc OrganizationService) *LabelService {
func NewLabelServiceWithOrg(s influxdb.LabelService, orgIDResolver OrgIDResolver) *LabelService {
return &LabelService{
s: s,
orgSvc: orgSvc,
s: s,
orgIDResolver: orgIDResolver,
}
}

Expand Down Expand Up @@ -55,10 +54,8 @@ func (s *LabelService) FindResourceLabels(ctx context.Context, filter influxdb.L
if err := filter.ResourceType.Valid(); err != nil {
return nil, err
}
if s.orgSvc == nil {
return nil, errors.New("failed to find orgSvc")
}
orgID, err := s.orgSvc.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID)

orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID)
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions authorizer/urm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import (
"github.com/influxdata/influxdb/v2"
)

type OrganizationService interface {
type OrgIDResolver interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not objecting to the name, but throwing out OrgIDFinder as an alternative, in case you like it and the verb is FindResourceOrganizationID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good shout. I actually rename this again in two PRs time to just Resolver (in the Telegraf PR). But I will rename it Finder in that PR as it finds both org IDs and name 👍

FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error)
}

type URMService struct {
s influxdb.UserResourceMappingService
orgService OrganizationService
s influxdb.UserResourceMappingService
orgIDResolver OrgIDResolver
}

func NewURMService(orgSvc OrganizationService, s influxdb.UserResourceMappingService) *URMService {
func NewURMService(orgIDResolver OrgIDResolver, s influxdb.UserResourceMappingService) *URMService {
return &URMService{
s: s,
orgService: orgSvc,
s: s,
orgIDResolver: orgIDResolver,
}
}

Expand All @@ -27,11 +27,11 @@ func (s *URMService) FindUserResourceMappings(ctx context.Context, filter influx
if err != nil {
return nil, 0, err
}
return AuthorizeFindUserResourceMappings(ctx, s.orgService, urms)
return AuthorizeFindUserResourceMappings(ctx, s.orgIDResolver, urms)
}

func (s *URMService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
orgID, err := s.orgService.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID)
orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID)
if err != nil {
return err
}
Expand All @@ -49,7 +49,7 @@ func (s *URMService) DeleteUserResourceMapping(ctx context.Context, resourceID i
}

for _, urm := range urms {
orgID, err := s.orgService.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID)
orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions authorizer/urm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (s *OrgService) FindResourceOrganizationID(ctx context.Context, rt influxdb
func TestURMService_FindUserResourceMappings(t *testing.T) {
type fields struct {
UserResourceMappingService influxdb.UserResourceMappingService
OrgService authorizer.OrganizationService
OrgService authorizer.OrgIDResolver
}
type args struct {
permission influxdb.Permission
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestURMService_FindUserResourceMappings(t *testing.T) {
func TestURMService_WriteUserResourceMapping(t *testing.T) {
type fields struct {
UserResourceMappingService influxdb.UserResourceMappingService
OrgService authorizer.OrganizationService
OrgService authorizer.OrgIDResolver
}
type args struct {
permission influxdb.Permission
Expand Down
10 changes: 5 additions & 5 deletions cmd/influx/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func taskFindF(cmd *cobra.Command, args []string) error {
}
filter.Limit = taskFindFlags.limit

var tasks []http.Task
var tasks []*influxdb.Task

if taskFindFlags.id != "" {
id, err := influxdb.IDFromString(taskFindFlags.id)
Expand All @@ -184,7 +184,7 @@ func taskFindF(cmd *cobra.Command, args []string) error {
return err
}

tasks = append(tasks, *task)
tasks = append(tasks, task)
} else {
tasks, _, err = s.FindTasks(context.Background(), filter)
if err != nil {
Expand Down Expand Up @@ -322,8 +322,8 @@ func taskDeleteF(cmd *cobra.Command, args []string) error {
type taskPrintOpts struct {
hideHeaders bool
json bool
task *http.Task
tasks []http.Task
task *influxdb.Task
tasks []*influxdb.Task
}

func printTasks(w io.Writer, opts taskPrintOpts) error {
Expand Down Expand Up @@ -351,7 +351,7 @@ func printTasks(w io.Writer, opts taskPrintOpts) error {
)

if opts.task != nil {
opts.tasks = append(opts.tasks, *opts.task)
opts.tasks = append(opts.tasks, opts.task)
}

for _, t := range opts.tasks {
Expand Down
32 changes: 30 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
iqlquery "github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/internal/resource"
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/kit/feature"
overrideflagger "github.com/influxdata/influxdb/v2/kit/feature/override"
Expand All @@ -45,6 +46,7 @@ import (
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
"github.com/influxdata/influxdb/v2/pkger"
infprom "github.com/influxdata/influxdb/v2/prometheus"
"github.com/influxdata/influxdb/v2/query"
Expand Down Expand Up @@ -989,7 +991,14 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService)
if err != nil {
return err
}

// tasks service notification middleware which keeps task service up to date
// with persisted changes to notification rules.
notificationRuleSvc = middleware.NewNotificationRuleStore(notificationRuleSvc, m.kvService, coordinator)
}

// NATS streaming server
Expand Down Expand Up @@ -1121,6 +1130,25 @@ func (m *Launcher) run(ctx context.Context) (err error) {
onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, metric.WithSuffix("new")) // with metrics
onboardSvc = tenant.NewOnboardingLogger(m.log.With(zap.String("handler", "onboard")), onboardSvc) // with logging

// orgIDResolver is a deprecated type which combines the lookups
// of multiple resources into one type, used to resolve the resources
// associated org ID. It is a stop-gap while we move this behaviour
// off of *kv.Service to aid in reducing the coupling on this type.
orgIDResolver := &resource.OrgIDResolver{
AuthorizationFinder: authSvc,
BucketFinder: ts.BucketService,
OrganizationFinder: ts.OrganizationService,
DashboardFinder: dashboardSvc,
SourceFinder: sourceSvc,
TaskFinder: taskSvc,
TelegrafConfigFinder: telegrafSvc,
VariableFinder: variableSvc,
TargetFinder: scraperTargetSvc,
CheckFinder: checkSvc,
NotificationEndpointFinder: notificationEndpointStore,
NotificationRuleFinder: notificationRuleSvc,
}

m.apibackend = &http.APIBackend{
AssetsPath: m.assetsPath,
HTTPErrorHandler: kithttp.ErrorHandler(0),
Expand Down Expand Up @@ -1169,7 +1197,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
SecretService: secretSvc,
LookupService: lookupSvc,
DocumentService: m.kvService,
OrgLookupService: m.kvService,
OrgLookupService: orgIDResolver,
WriteEventRecorder: infprom.NewEventRecorder("write"),
QueryEventRecorder: infprom.NewEventRecorder("query"),
Flagger: m.flagger,
Expand Down
35 changes: 18 additions & 17 deletions cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
influxdbcontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/http"
Expand All @@ -37,10 +37,10 @@ type TestLauncher struct {
Path string

// Initialized after calling the Setup() helper.
User *platform.User
Org *platform.Organization
Bucket *platform.Bucket
Auth *platform.Authorization
User *influxdb.User
Org *influxdb.Organization
Bucket *influxdb.Bucket
Auth *influxdb.Authorization

httpClient *httpc.Client

Expand Down Expand Up @@ -127,7 +127,7 @@ func (tl *TestLauncher) ShutdownOrFail(tb testing.TB, ctx context.Context) {

// Setup creates a new user, bucket, org, and auth token.
func (tl *TestLauncher) Setup() error {
results, err := tl.OnBoard(&platform.OnboardingRequest{
results, err := tl.OnBoard(&influxdb.OnboardingRequest{
User: "USER",
Password: "PASSWORD",
Org: "ORG",
Expand All @@ -153,13 +153,13 @@ func (tl *TestLauncher) SetupOrFail(tb testing.TB) {

// OnBoard attempts an on-boarding request.
// The on-boarding status is also reset to allow multiple user/org/buckets to be created.
func (tl *TestLauncher) OnBoard(req *platform.OnboardingRequest) (*platform.OnboardingResults, error) {
func (tl *TestLauncher) OnBoard(req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) {
return tl.apibackend.OnboardingService.OnboardInitialUser(context.Background(), req)
}

// OnBoardOrFail attempts an on-boarding request or fails on error.
// The on-boarding status is also reset to allow multiple user/org/buckets to be created.
func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingRequest) *platform.OnboardingResults {
func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *influxdb.OnboardingRequest) *influxdb.OnboardingResults {
tb.Helper()
res, err := tl.OnBoard(req)
if err != nil {
Expand All @@ -169,7 +169,7 @@ func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingReq
}

// WriteOrFail attempts a write to the organization and bucket identified by to or fails if there is an error.
func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *platform.OnboardingResults, data string) {
func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *influxdb.OnboardingResults, data string) {
tb.Helper()
resp, err := nethttp.DefaultClient.Do(tl.NewHTTPRequestOrFail(tb, "POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", to.Org.ID, to.Bucket.ID), to.Auth.Token, data))
if err != nil {
Expand Down Expand Up @@ -295,7 +295,7 @@ func (tl *TestLauncher) QueryAndNopConsume(ctx context.Context, req *query.Reque

// FluxQueryOrFail performs a query to the specified organization and returns the results
// or fails if there is an error.
func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organization, token string, query string) string {
func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *influxdb.Organization, token string, query string) string {
tb.Helper()

b, err := http.SimpleQuery(tl.URL(), query, org.Name, token)
Expand All @@ -308,7 +308,7 @@ func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organizatio

// QueryFlux returns the csv response from a flux query.
// It also removes all the \r to make it easier to write tests.
func (tl *TestLauncher) QueryFlux(tb testing.TB, org *platform.Organization, token, query string) string {
func (tl *TestLauncher) QueryFlux(tb testing.TB, org *influxdb.Organization, token, query string) string {
tb.Helper()

b, err := http.SimpleQuery(tl.URL(), query, org.Name, token)
Expand Down Expand Up @@ -367,7 +367,7 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService {
return &http.BucketService{Client: tl.HTTPClient(tb)}
}

func (tl *TestLauncher) CheckService() platform.CheckService {
func (tl *TestLauncher) CheckService() influxdb.CheckService {
return tl.kvService
}

Expand All @@ -386,19 +386,20 @@ func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.Notific
return http.NewNotificationEndpointService(tl.HTTPClient(tb))
}

func (tl *TestLauncher) NotificationRuleService() platform.NotificationRuleStore {
return tl.kvService
func (tl *TestLauncher) NotificationRuleService(tb testing.TB) influxdb.NotificationRuleStore {
tb.Helper()
return http.NewNotificationRuleService(tl.HTTPClient(tb))
}

func (tl *TestLauncher) OrgService(tb testing.TB) platform.OrganizationService {
func (tl *TestLauncher) OrgService(tb testing.TB) influxdb.OrganizationService {
return tl.kvService
}

func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC {
return &pkger.HTTPRemoteService{Client: tl.HTTPClient(tb)}
}

func (tl *TestLauncher) TaskServiceKV() platform.TaskService {
func (tl *TestLauncher) TaskServiceKV(tb testing.TB) influxdb.TaskService {
return tl.kvService
}

Expand All @@ -416,7 +417,7 @@ func (tl *TestLauncher) AuthorizationService(tb testing.TB) *http.AuthorizationS
return &http.AuthorizationService{Client: tl.HTTPClient(tb)}
}

func (tl *TestLauncher) TaskService(tb testing.TB) *http.TaskService {
func (tl *TestLauncher) TaskService(tb testing.TB) influxdb.TaskService {
return &http.TaskService{Client: tl.HTTPClient(tb)}
}

Expand Down
Loading