Skip to content

Commit

Permalink
fix: close cursors in storage, move valid resource and org ctx to kit
Browse files Browse the repository at this point in the history
  • Loading branch information
AlirieGray committed May 19, 2020
1 parent 4778612 commit 614b611
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 92 deletions.
9 changes: 9 additions & 0 deletions id.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ var (
}
)

// ErrCorruptID means the ID stored in the Store is corrupt.
func ErrCorruptID(err error) *Error {
return &Error{
Code: EInvalid,
Msg: "corrupt ID provided",
Err: err,
}
}

// ID is a unique identifier.
//
// Its zero value is not a valid ID.
Expand Down
42 changes: 42 additions & 0 deletions kit/transport/http/middleware.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package http

import (
"context"
"net/http"
"path"
"strings"
"time"

"github.com/go-chi/chi"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
ua "github.com/mileusna/useragent"
Expand Down Expand Up @@ -130,3 +132,43 @@ func shiftPath(p string) (head, tail string) {
}
return p[1:i], p[i:]
}

type OrgContext string

const CtxOrgKey OrgContext = "orgID"

// ValidResource make sure a resource exists when a sub system needs to be mounted to an api
func ValidResource(api *API, lookupOrgByResourceID func(context.Context, influxdb.ID) (influxdb.ID, error)) Middleware {
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
statusW := NewStatusResponseWriter(w)
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
api.Err(w, r, influxdb.ErrCorruptID(err))
return
}

ctx := r.Context()

orgID, err := lookupOrgByResourceID(ctx, *id)
if err != nil {
api.Err(w, r, err)
return
}

// embed OrgID into context
next.ServeHTTP(statusW, r.WithContext(context.WithValue(ctx, CtxOrgKey, orgID)))
}
return http.HandlerFunc(fn)
}
}

// OrgIDFromContext ....
func OrgIDFromContext(ctx context.Context) *influxdb.ID {
v := ctx.Value(CtxOrgKey)
if v == nil {
return nil
}
id := v.(influxdb.ID)
return &id
}
1 change: 1 addition & 0 deletions kv/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (s *Service) PutLabel(ctx context.Context, l *influxdb.Label) error {
})
}

// CreateUserResourceMappingForOrg can be made private once URMs are removed from the Label Service
func (s *Service) CreateUserResourceMappingForOrg(ctx context.Context, tx Tx, orgID influxdb.ID, resID influxdb.ID, resType influxdb.ResourceType) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
Expand Down
9 changes: 5 additions & 4 deletions label/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@ func (h *LabelHandler) handlePostLabel(w http.ResponseWriter, r *http.Request) {
return
}

// TODO(al): ensure that the specified org actually exists
// can be done in service
if err := label.Validate(); err != nil {
h.api.Err(w, r, err)
return
}

if err := h.labelSvc.CreateLabel(r.Context(), &label); err != nil {
h.api.Err(w, r, err)
return
}
h.log.Debug("Label created", zap.String("label", fmt.Sprint(label)))
// todo (al) add logging to middleware

h.api.Respond(w, r, http.StatusCreated, newLabelResponse(&label))
}
Expand All @@ -111,7 +112,7 @@ func (h *LabelHandler) handleGetLabel(w http.ResponseWriter, r *http.Request) {
if err != nil {
h.api.Err(w, r, err)
return
} // old message label is not valid
}

l, err := h.labelSvc.FindLabelByID(r.Context(), *id)
if err != nil {
Expand Down
16 changes: 2 additions & 14 deletions label/middleware_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ import (

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
)

var _ influxdb.LabelService = (*AuthedLabelService)(nil)

type labelContext string

const ctxOrgKey labelContext = "orgID"

type AuthedLabelService struct {
s influxdb.LabelService
}
Expand Down Expand Up @@ -67,7 +64,7 @@ func (s *AuthedLabelService) FindResourceLabels(ctx context.Context, filter infl
}

// check the permissions for the resource by the org on the context
orgID := orgIDFromContext(ctx)
orgID := kithttp.OrgIDFromContext(ctx)
if orgID == nil {
return nil, errors.New("failed to find orgID on context")
}
Expand Down Expand Up @@ -134,12 +131,3 @@ func (s *AuthedLabelService) DeleteLabelMapping(ctx context.Context, m *influxdb
}
return s.s.DeleteLabelMapping(ctx, m)
}

func orgIDFromContext(ctx context.Context) *influxdb.ID {
v := ctx.Value(ctxOrgKey)
if v == nil {
return nil
}
id := v.(influxdb.ID)
return &id
}
5 changes: 3 additions & 2 deletions label/middleware_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
influxdbcontext "github.com/influxdata/influxdb/v2/context"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
)
Expand Down Expand Up @@ -268,7 +269,7 @@ func TestLabelService_FindLabels(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := NewAuthedLabelService(tt.fields.LabelService)

ctx := context.WithValue(context.Background(), ctxOrgKey, orgOneInfluxID)
ctx := context.WithValue(context.Background(), kithttp.CtxOrgKey, orgOneInfluxID)
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, []influxdb.Permission{tt.args.permission}))

labels, err := s.FindLabels(ctx, influxdb.LabelFilter{})
Expand Down Expand Up @@ -812,7 +813,7 @@ func TestLabelService_FindResourceLabels(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := NewAuthedLabelService(tt.fields.LabelService)

ctx := context.WithValue(context.Background(), ctxOrgKey, orgOneInfluxID)
ctx := context.WithValue(context.Background(), kithttp.CtxOrgKey, orgOneInfluxID)
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))

labels, err := s.FindResourceLabels(ctx, tt.args.filter)
Expand Down
5 changes: 2 additions & 3 deletions label/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (s *Service) CreateLabel(ctx context.Context, l *influxdb.Label) error {
if err := s.store.CreateLabel(ctx, tx, l); err != nil {
return err
}

if err := s.kvSvc.CreateUserResourceMappingForOrg(ctx, tx, l.OrgID, l.ID, influxdb.LabelsResourceType); err != nil {
return err
}
Expand Down Expand Up @@ -115,7 +114,7 @@ func (s *Service) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.
})

if err != nil {
return nil, err // todo (al) not found error?
return nil, err
}

if len(upd.Properties) > 0 && label.Properties == nil {
Expand Down Expand Up @@ -224,7 +223,7 @@ func (s *Service) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMappi
return nil
})
if err != nil {
return err // todo (al) not found error
return err
}

return s.store.Update(ctx, func(tx kv.Tx) error {
Expand Down
15 changes: 11 additions & 4 deletions label/storage_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ func (s *Store) UpdateLabel(ctx context.Context, tx kv.Tx, l *influxdb.Label) er
return nil
}

// TODO (al) get rid of mappings here...?
// https://github.com/influxdata/influxdb/issues/11278
func (s *Store) DeleteLabel(ctx context.Context, tx kv.Tx, id influxdb.ID) error {
label, err := s.GetLabel(ctx, tx, id)
if err != nil {
Expand Down Expand Up @@ -290,7 +288,12 @@ func (s *Store) FindResourceLabels(ctx context.Context, tx kv.Tx, filter influxd

*ls = append(*ls, l)
}
return nil

if err := cur.Err(); err != nil {
return err
}

return cur.Close()
}

func (s *Store) DeleteLabelMapping(ctx context.Context, tx kv.Tx, m *influxdb.LabelMapping) error {
Expand Down Expand Up @@ -409,7 +412,11 @@ func forEachLabel(ctx context.Context, tx kv.Tx, fn func(*influxdb.Label) bool)
}
}

return nil
if err := cur.Err(); err != nil {
return err
}

return cur.Close()
}

// uniqueID returns nil if the ID provided is unique, returns an error otherwise
Expand Down
9 changes: 0 additions & 9 deletions tenant/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ var (
}
)

// ErrCorruptID the ID stored in the Store is corrupt.
func ErrCorruptID(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "corrupt ID provided",
Err: err,
}
}

// ErrInternalServiceError is used when the error comes from an internal system.
func ErrInternalServiceError(err error) *influxdb.Error {
return &influxdb.Error{
Expand Down
48 changes: 0 additions & 48 deletions tenant/http_server.go

This file was deleted.

2 changes: 1 addition & 1 deletion tenant/http_server_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewHTTPBucketHandler(log *zap.Logger, bucketSvc influxdb.BucketService, urm
r.Delete("/", svr.handleDeleteBucket)

// mount embedded resources
mountableRouter := r.With(ValidResource(svr.api, svr.lookupOrgByBucketID))
mountableRouter := r.With(kithttp.ValidResource(svr.api, svr.lookupOrgByBucketID))
mountableRouter.Mount("/members", urmHandler)
mountableRouter.Mount("/owners", urmHandler)
mountableRouter.Mount("/labels", labelHandler)
Expand Down
2 changes: 1 addition & 1 deletion tenant/http_server_org.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewHTTPOrgHandler(log *zap.Logger, orgService influxdb.OrganizationService,
r.Delete("/", svr.handleDeleteOrg)

// mount embedded resources
mountableRouter := r.With(ValidResource(svr.api, svr.lookupOrgByID))
mountableRouter := r.With(kithttp.ValidResource(svr.api, svr.lookupOrgByID))
mountableRouter.Mount("/members", urm)
mountableRouter.Mount("/owners", urm)
mountableRouter.Mount("/labels", labelHandler)
Expand Down
7 changes: 4 additions & 3 deletions tenant/middleware_urm_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
)

type AuthedURMService struct {
Expand All @@ -26,8 +27,8 @@ func (s *AuthedURMService) FindUserResourceMappings(ctx context.Context, filter
}

authedUrms := urms[:0]
orgID := kithttp.OrgIDFromContext(ctx)
for _, urm := range urms {
orgID := orgIDFromContext(ctx) // todo (al) could this be moved out?
if orgID != nil {
if _, _, err := authorizer.AuthorizeRead(ctx, urm.ResourceType, urm.ResourceID, *orgID); err != nil {
continue
Expand All @@ -44,7 +45,7 @@ func (s *AuthedURMService) FindUserResourceMappings(ctx context.Context, filter
}

func (s *AuthedURMService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
orgID := orgIDFromContext(ctx)
orgID := kithttp.OrgIDFromContext(ctx)
if orgID != nil {
if _, _, err := authorizer.AuthorizeWrite(ctx, m.ResourceType, m.ResourceID, *orgID); err != nil {
return err
Expand All @@ -71,7 +72,7 @@ func (s *AuthedURMService) DeleteUserResourceMapping(ctx context.Context, resour

// There should only be one because resourceID and userID are used to create the primary key for urms
for _, urm := range urms {
orgID := orgIDFromContext(ctx)
orgID := kithttp.OrgIDFromContext(ctx)
if orgID != nil {
if _, _, err := authorizer.AuthorizeWrite(ctx, urm.ResourceType, urm.ResourceID, *orgID); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion tenant/middleware_urm_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
influxdbcontext "github.com/influxdata/influxdb/v2/context"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
)
Expand Down Expand Up @@ -105,7 +106,7 @@ func TestURMService_FindUserResourceMappings(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := NewAuthedURMService(tt.fields.OrgService, tt.fields.UserResourceMappingService)
orgID := influxdbtesting.IDPtr(10)
ctx := context.WithValue(context.Background(), ctxOrgKey, *orgID)
ctx := context.WithValue(context.Background(), kithttp.CtxOrgKey, *orgID)
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))

urms, _, err := s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{})
Expand Down
2 changes: 1 addition & 1 deletion tenant/storage_org.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *Store) GetOrgByName(ctx context.Context, tx kv.Tx, n string) (*influxdb

var id influxdb.ID
if err := id.Decode(uid); err != nil {
return nil, ErrCorruptID(err)
return nil, influxdb.ErrCorruptID(err)
}
return s.GetOrg(ctx, tx, id)
}
Expand Down
2 changes: 1 addition & 1 deletion tenant/storage_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *Store) GetUserByName(ctx context.Context, tx kv.Tx, n string) (*influxd

var id influxdb.ID
if err := id.Decode(uid); err != nil {
return nil, ErrCorruptID(err)
return nil, influxdb.ErrCorruptID(err)
}
return s.GetUser(ctx, tx, id)
}
Expand Down

0 comments on commit 614b611

Please sign in to comment.