Skip to content

Commit

Permalink
fix(telegraf): support pagination parameters when listing
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgeMac committed Nov 4, 2020
1 parent 42445a7 commit b274e15
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 87 deletions.
8 changes: 4 additions & 4 deletions cmd/influx/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func (b *cmdTelegrafBuilder) listRunE(cmd *cobra.Command, args []string) error {
return b.writeTelegrafConfig(cfg)
}

cfgs, _, err := svc.FindTelegrafConfigs(context.Background(), influxdb.TelegrafConfigFilter{
OrgID: &orgID,
UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ResourceType: influxdb.TelegrafsResourceType},
})
cfgs, _, err := svc.FindTelegrafConfigs(context.Background(),
influxdb.TelegrafConfigFilter{
OrgID: &orgID,
})
if err != nil {
return err
}
Expand Down
48 changes: 25 additions & 23 deletions dbrp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
Expand Down Expand Up @@ -160,38 +159,39 @@ func (s *Service) unsetDefault(tx kv.Tx, compKey []byte) error {
// getFirstBut returns the first element in the db/rp index (not accounting for the `skipID`).
// If the length of the returned ID is 0, it means no element was found.
// The skip value is useful, for instance, if one wants to delete an element based on the result of this operation.
func (s *Service) getFirstBut(tx kv.Tx, compKey []byte, skipID []byte) ([]byte, error) {
stop := fmt.Errorf("stop")
var next []byte
if err := s.byOrgAndDatabase.Walk(context.Background(), tx, compKey, func(k, v []byte) error {
func (s *Service) getFirstBut(tx kv.Tx, compKey []byte, skipID []byte) (next []byte, err error) {
err = s.byOrgAndDatabase.Walk(context.Background(), tx, compKey, func(k, v []byte) (bool, error) {
if bytes.Equal(skipID, k) {
return nil
return true, nil
}

next = k
return stop
}); err != nil && err != stop {
return nil, ErrInternalService(err)
}
return next, nil

return false, nil
})
return
}

// isDBRPUnique verifies if the triple orgID-database-retention-policy is unique.
func (s *Service) isDBRPUnique(ctx context.Context, m influxdb.DBRPMappingV2) error {
return s.store.View(ctx, func(tx kv.Tx) error {
return s.byOrgAndDatabase.Walk(ctx, tx, composeForeignKey(m.OrganizationID, m.Database), func(k, v []byte) error {
return s.byOrgAndDatabase.Walk(ctx, tx, composeForeignKey(m.OrganizationID, m.Database), func(k, v []byte) (bool, error) {
dbrp := &influxdb.DBRPMappingV2{}
if err := json.Unmarshal(v, dbrp); err != nil {
return ErrInternalService(err)
return false, ErrInternalService(err)
}

if dbrp.ID == m.ID {
// Corner case.
// This is the very same DBRP, just skip it!
return nil
return true, nil
}

if dbrp.RetentionPolicy == m.RetentionPolicy {
return ErrDBRPAlreadyExists("another DBRP mapping with same orgID, db, and rp exists")
return false, ErrDBRPAlreadyExists("another DBRP mapping with same orgID, db, and rp exists")
}
return nil

return true, nil
})
})
}
Expand Down Expand Up @@ -254,22 +254,23 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
}

ms := []*influxdb.DBRPMappingV2{}
add := func(tx kv.Tx) func(k, v []byte) error {
return func(k, v []byte) error {
add := func(tx kv.Tx) func(k, v []byte) (bool, error) {
return func(k, v []byte) (bool, error) {
m := influxdb.DBRPMappingV2{}
if err := json.Unmarshal(v, &m); err != nil {
return ErrInternalService(err)
return false, ErrInternalService(err)
}
// Updating the Default field must be done before filtering.
defID, err := get(tx, m.OrganizationID, m.Database)
if err != nil {
return ErrInternalService(err)
return false, ErrInternalService(err)
}

m.Default = m.ID == *defID
if filterFunc(&m, filter) {
ms = append(ms, &m)
}
return nil
return true, nil
}
}

Expand Down Expand Up @@ -303,7 +304,8 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
if err != nil {
return ErrInternalService(err)
}
return add(tx)(defID, v)
_, err = add(tx)(defID, v)
return err
}
}
return s.byOrgAndDatabase.Walk(ctx, tx, compKey, add(tx))
Expand All @@ -318,7 +320,7 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
}

for k, v := cur.First(); k != nil; k, v = cur.Next() {
if err := add(tx)(k, v); err != nil {
if _, err := add(tx)(k, v); err != nil {
return err
}
}
Expand Down
15 changes: 3 additions & 12 deletions http/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,8 @@ func (h *TelegrafHandler) handleGetTelegraf(w http.ResponseWriter, r *http.Reque

func decodeTelegrafConfigFilter(ctx context.Context, r *http.Request) (*influxdb.TelegrafConfigFilter, error) {
f := &influxdb.TelegrafConfigFilter{}
urm, err := decodeUserResourceMappingFilter(ctx, r, influxdb.TelegrafsResourceType)
if err == nil {
f.UserResourceMappingFilter = *urm
}

q := r.URL.Query()

if orgIDStr := q.Get("orgID"); orgIDStr != "" {
orgID, err := influxdb.IDFromString(orgIDStr)
if err != nil {
Expand All @@ -297,7 +293,8 @@ func decodeTelegrafConfigFilter(ctx context.Context, r *http.Request) (*influxdb
} else if orgNameStr := q.Get("org"); orgNameStr != "" {
f.Organization = &orgNameStr
}
return f, err

return f, nil
}

// handlePostTelegraf is the HTTP handler for the POST /api/v2/telegrafs route.
Expand Down Expand Up @@ -445,12 +442,6 @@ func (s *TelegrafService) FindTelegrafConfigs(ctx context.Context, f influxdb.Te
if f.Organization != nil {
params = append(params, [2]string{"organization", *f.Organization})
}
if f.ResourceID != 0 {
params = append(params, [2]string{"resourceID", f.ResourceID.String()})
}
if f.UserID != 0 {
params = append(params, [2]string{"userID", f.UserID.String()})
}

var resp struct {
Configs []*influxdb.TelegrafConfig `json:"configurations"`
Expand Down
6 changes: 3 additions & 3 deletions kv/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Buck

for i, value := range values {
if value != nil {
if err := visit(keys[i], value); err != nil {
if cont, err := visit(keys[i], value); !cont || err != nil {
return err
}
}
Expand Down Expand Up @@ -390,9 +390,9 @@ func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, err
return err
}

return WalkCursor(ctx, cursor, func(k, v []byte) error {
return WalkCursor(ctx, cursor, func(k, v []byte) (bool, error) {
kvs = append(kvs, [2][]byte{k, v})
return nil
return true, nil
})
})
}
12 changes: 6 additions & 6 deletions kv/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,24 +253,24 @@ func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id influxdb
return err
}

return kv.WalkCursor(ctx, cursor, func(k, v []byte) error {
return kv.WalkCursor(ctx, cursor, func(k, v []byte) (bool, error) {
var id influxdb.ID
if err := id.Decode(k); err != nil {
return fmt.Errorf("decoding migration id: %w", err)
return false, fmt.Errorf("decoding migration id: %w", err)
}

var migration Migration
if err := json.Unmarshal(v, &migration); err != nil {
return err
return false, err
}

idx := int(id) - 1
if idx >= len(m.Specs) {
return fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound)
return false, fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound)
}

if spec := m.Specs[idx]; spec.MigrationName() != migration.Name {
return fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name)
return false, fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name)
}

if migration.FinishedAt != nil {
Expand All @@ -279,7 +279,7 @@ func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id influxdb

fn(id, migration)

return nil
return true, nil
})
}); err != nil {
return fmt.Errorf("reading migrations: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions kv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func WithCursorLimit(limit int) CursorOption {

// VisitFunc is called for each k, v byte slice pair from the underlying source bucket
// which are found in the index bucket for a provided foreign key.
type VisitFunc func(k, v []byte) error
type VisitFunc func(k, v []byte) (bool, error)

// WalkCursor consumers the forward cursor call visit for each k/v pair found
func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error) {
Expand All @@ -245,7 +245,7 @@ func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err
}()

for k, v := cursor.Next(); k != nil; k, v = cursor.Next() {
if err := visit(k, v); err != nil {
if cont, err := visit(k, v); !cont || err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions kv/urm.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,17 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in
if filter.UserID.Valid() {
// urm by user index lookup
userID, _ := filter.UserID.Encode()
if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error {
if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) (bool, error) {
m := &influxdb.UserResourceMapping{}
if err := json.Unmarshal(v, m); err != nil {
return CorruptURMError(err)
return false, CorruptURMError(err)
}

if filterFn(m) {
ms = append(ms, m)
}

return nil
return true, nil
}); err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions paging.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ type FindOptions struct {
Descending bool
}

// GetLimit returns the resolved limit between then limit boundaries.
// Given a limit <= 0 it returns the default limit.
func (f *FindOptions) GetLimit() int {
if f == nil || f.Limit <= 0 {
return DefaultPageSize
}

if f.Limit > MaxPageSize {
return MaxPageSize
}

return f.Limit
}

// DecodeFindOptions returns a FindOptions decoded from http request.
func DecodeFindOptions(r *http.Request) (*FindOptions, error) {
opts := &FindOptions{}
Expand Down
9 changes: 5 additions & 4 deletions secret/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,20 @@ func (s *Storage) ListSecret(ctx context.Context, tx kv.Tx, orgID influxdb.ID) (

keys := []string{}

err = kv.WalkCursor(ctx, cur, func(k, v []byte) error {
err = kv.WalkCursor(ctx, cur, func(k, v []byte) (bool, error) {
id, key, err := decodeSecretKey(k)
if err != nil {
return err
return false, err
}

if id != orgID {
// We've reached the end of the keyspace for the provided orgID
return nil
return false, nil
}

keys = append(keys, key)
return nil

return true, nil
})
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type TelegrafConfigStore interface {
type TelegrafConfigFilter struct {
OrgID *ID
Organization *string
UserResourceMappingFilter
}

// TelegrafConfig stores telegraf config for one telegraf instance.
Expand Down
37 changes: 29 additions & 8 deletions telegraf/service/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,40 @@ func (s *Service) findTelegrafConfigByID(ctx context.Context, tx kv.Tx, id influ
// Additional options provide pagination & sorting.
func (s *Service) FindTelegrafConfigs(ctx context.Context, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) (tcs []*influxdb.TelegrafConfig, n int, err error) {
err = s.kv.View(ctx, func(tx kv.Tx) error {
tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter)
tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter, opt...)
return err
})
return tcs, n, err
}

func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) ([]*influxdb.TelegrafConfig, int, error) {
tcs := make([]*influxdb.TelegrafConfig, 0)
var (
limit = influxdb.DefaultPageSize
offset int
count int
tcs = make([]*influxdb.TelegrafConfig, 0)
)

if len(opt) > 0 {
limit = opt[0].GetLimit()
offset = opt[0].Offset
}

visit := func(k, v []byte) error {
visit := func(k, v []byte) (bool, error) {
var tc influxdb.TelegrafConfig
if err := json.Unmarshal(v, &tc); err != nil {
return err
return false, err
}

tcs = append(tcs, &tc)
// skip until offset reached
if count >= offset {
tcs = append(tcs, &tc)
}

return nil
count++

// stop cursing when limit is reached
return len(tcs) < limit, nil
}

if filter.OrgID == nil {
Expand All @@ -183,8 +198,14 @@ func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter infl
return nil, 0, err
}

// TODO(georgemac): convert find options into cursor options
cursor, err := bucket.ForwardCursor(nil)
// cursors do not support numeric offset
// but we can at least constrain the response
// size by the offset + limit since we are
// not doing any other filtering
// REMOVE this cursor option if you do any
// other filtering

cursor, err := bucket.ForwardCursor(nil, kv.WithCursorLimit(offset+limit))
if err != nil {
return nil, 0, err
}
Expand Down
Loading

0 comments on commit b274e15

Please sign in to comment.