Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist lastFetched control data across launcher restarts #990

Merged
merged 9 commits into from
Feb 1, 2023
6 changes: 5 additions & 1 deletion cmd/launcher/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/kolide/launcher/ee/control"
"github.com/kolide/launcher/pkg/launcher"
"go.etcd.io/bbolt"
)

func createHTTPClient(ctx context.Context, logger log.Logger, opts *launcher.Options) (*control.HTTPClient, error) {
Expand All @@ -29,16 +30,19 @@ func createHTTPClient(ctx context.Context, logger log.Logger, opts *launcher.Opt
return client, nil
}

func createControlService(ctx context.Context, logger log.Logger, opts *launcher.Options) (*control.ControlService, error) {
func createControlService(ctx context.Context, logger log.Logger, db *bbolt.DB, opts *launcher.Options) (*control.ControlService, error) {
level.Debug(logger).Log("msg", "creating control service")

client, err := createHTTPClient(ctx, logger, opts)
if err != nil {
return nil, err
}

getset := control.NewBucketConsumer(logger, db, "control_service_data")

controlOpts := []control.Option{
control.WithRequestInterval(opts.ControlRequestInterval),
control.WithGetterSetter(getset),
}
service := control.New(logger, ctx, client, controlOpts...)

Expand Down
4 changes: 2 additions & 2 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func runLauncher(ctx context.Context, cancel func(), opts *launcher.Options) err
"build", versionInfo.Revision,
)

controlService, err := createControlService(ctx, logger, opts)
controlService, err := createControlService(ctx, logger, db, opts)
if err != nil {
return fmt.Errorf("failed to setup control service: %w", err)
}
Expand All @@ -200,7 +200,7 @@ func runLauncher(ctx context.Context, cancel func(), opts *launcher.Options) err
desktopRunner.WithAuthToken(ulid.New()),
desktopRunner.WithUsersFilesRoot(rootDirectory),
desktopRunner.WithProcessSpawningEnabled(opts.KolideServerURL == "k2device-preprod.kolide.com" || opts.KolideServerURL == "localhost:3443" || strings.HasSuffix(opts.KolideServerURL, "herokuapp.com")),
desktopRunner.WithStoredDataProvider(desktopFlagsBucketConsumer),
desktopRunner.WithGetter(desktopFlagsBucketConsumer),
)
runGroup.Add(runner.Execute, runner.Interrupt)
controlService.RegisterConsumer("kolide_desktop_menu", runner)
Expand Down
30 changes: 29 additions & 1 deletion ee/control/bucket_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func NewBucketConsumer(logger log.Logger, db *bbolt.DB, bucketName string) *buck
}

func (bc *bucketConsumer) Update(data io.Reader) error {
if bc == nil {
return fmt.Errorf("bucketConsumer is nil")
}

var kvPairs map[string]string
if err := json.NewDecoder(data).Decode(&kvPairs); err != nil {
return fmt.Errorf("failed to decode '%s' bucket consumer json: %w", bc.bucketName, err)
Expand Down Expand Up @@ -90,7 +94,11 @@ func (bc *bucketConsumer) Update(data io.Reader) error {
})
}

func (bc *bucketConsumer) GetByKey(key []byte) (value []byte, err error) {
func (bc *bucketConsumer) Get(key []byte) (value []byte, err error) {
if bc == nil {
return nil, fmt.Errorf("bucketConsumer is nil")
}

if err := bc.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bc.bucketName))
if b == nil {
Expand All @@ -105,3 +113,23 @@ func (bc *bucketConsumer) GetByKey(key []byte) (value []byte, err error) {

return value, nil
}

func (bc *bucketConsumer) Set(key, value []byte) error {
if bc == nil {
return fmt.Errorf("bucketConsumer is nil")
}

return bc.db.Update(func(tx *bbolt.Tx) error {
// Either create the bucket, or retrieve the existing one
bucket, err := tx.CreateBucketIfNotExists([]byte(bc.bucketName))
if err != nil {
return fmt.Errorf("creating bucket: %w", err)
}

if err := bucket.Put(key, value); err != nil {
return fmt.Errorf("error putting key-value in bucket: %w", err)
}

return nil
})
}
43 changes: 42 additions & 1 deletion ee/control/bucket_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,55 @@ func Test_Updates(t *testing.T) {
k := row["key"]
v := row["value"]

g, err := bc.GetByKey([]byte(k))
g, err := bc.Get([]byte(k))
assert.NoError(t, err)
assert.Equal(t, []byte(v), g)
}
})
}
}

func Test_GetSet(t *testing.T) {
t.Parallel()

tests := []struct {
name string
sets map[string]string
gets map[string]string
}{
{
name: "empty",
sets: map[string]string{},
gets: map[string]string{},
},
{
name: "multiple",
sets: map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"},
gets: map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

db := createDb(t)
bc := NewBucketConsumer(log.NewNopLogger(), db, tt.name)

for k, v := range tt.sets {
err := bc.Set([]byte(k), []byte(v))
require.NoError(t, err)
}

for k, v := range tt.gets {
val, err := bc.Get([]byte(k))
require.NoError(t, err)
assert.Equal(t, v, string(val))
}
})
}
}

func createDb(t *testing.T) *bbolt.DB {
dir := t.TempDir()

Expand Down
4 changes: 4 additions & 0 deletions ee/control/consumers/notificationconsumer/notify_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func NewNotifyConsumer(db *bbolt.DB, runner *desktopRunner.DesktopUsersProcesses
}

func (nc *NotificationConsumer) Update(data io.Reader) error {
if nc == nil {
return fmt.Errorf("NotificationConsumer is nil")
}

var notificationsToProcess []notification
if err := json.NewDecoder(data).Decode(&notificationsToProcess); err != nil {
return fmt.Errorf("failed to decode notification data: %w", err)
Expand Down
78 changes: 55 additions & 23 deletions ee/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/kolide/launcher/pkg/agent"
)

// ControlService is the main object that manages the control service. It is responsible for fetching
Expand All @@ -19,6 +20,7 @@ type ControlService struct {
cancel context.CancelFunc
requestInterval time.Duration
fetcher dataProvider
getset agent.GetterSetter
lastFetched map[string]string
consumers map[string]consumer
subscribers map[string][]subscriber
Expand All @@ -42,11 +44,6 @@ type dataProvider interface {
Get(hash string) (data io.Reader, err error)
}

// StoredDataProvider is an interface for querying data from a persistable data storage layer
type StoredDataProvider interface {
GetByKey(key []byte) (value []byte, err error)
}

func New(logger log.Logger, ctx context.Context, fetcher dataProvider, opts ...Option) *ControlService {
cs := &ControlService{
logger: logger,
Expand Down Expand Up @@ -119,39 +116,74 @@ func (cs *ControlService) Fetch() error {
}

for subsystem, hash := range subsystems {
if hash == cs.lastFetched[subsystem] {
lastHash, ok := cs.lastFetched[subsystem]
if !ok && cs.getset != nil {
storedHash, err := cs.getset.Get([]byte(subsystem))
if err != nil {
level.Debug(cs.logger).Log(
"msg", "failed to get last fetched hash from stored data",
"subsystem", subsystem,
"err", err)
// This isn't a fatal error
} else {
lastHash = string(storedHash)
}
}

if hash == lastHash {
// The last fetched update is still fresh
// Nothing to do, skip to the next subsystem
continue
}

data, err := cs.fetcher.Get(hash)
if err != nil {
return fmt.Errorf("failed to get control data: %w", err)
if err = cs.fetchAndUpdate(subsystem, hash); err != nil {
return err
}
}

if data == nil {
return fmt.Errorf("control data is nil")
}
level.Debug(cs.logger).Log("msg", "control data fetch complete")

return nil
}

// Fetches latest subsystem data, and notifies observers of updates.
func (cs *ControlService) fetchAndUpdate(subsystem, hash string) error {
data, err := cs.fetcher.Get(hash)
if err != nil {
return fmt.Errorf("failed to get control data: %w", err)
}

// Consumer and subscriber(s) notified now
err = cs.update(subsystem, data)
if data == nil {
return fmt.Errorf("control data is nil")
}

// Consumer and subscriber(s) notified now
err = cs.update(subsystem, data)
if err != nil {
level.Error(cs.logger).Log(
"msg", "failed to update consumers and subscribers",
"subsystem", subsystem,
"err", err)
// Although we failed to update, the payload may be bad and there's no
// sense in repeatedly attempting to apply a bad update.
// A new update will have a new hash, so continue and remember this hash.
}

// Remember the hash of the last fetched version of this subsystem's data
cs.lastFetched[subsystem] = hash

if cs.getset != nil {
// Store the hash so we can persist the last fetched data across launcher restarts
err = cs.getset.Set([]byte(subsystem), []byte(hash))
if err != nil {
level.Error(cs.logger).Log(
"msg", "failed to update consumers and subscribers",
"msg", "failed to store last fetched control data",
"subsystem", subsystem,
"err", err)
// Although we failed to update, the payload may be bad and there's no
// sense in repeatedly attempting to apply a bad update.
// A new update will have a new hash, so continue and remember this hash.
}

// Remember the hash of the last fetched version of this subsystem's data
cs.lastFetched[subsystem] = hash
}
}

level.Debug(cs.logger).Log("msg", "control data fetch complete")

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions ee/control/control_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@ package control

import (
"time"

"github.com/kolide/launcher/pkg/agent"
)

type Option func(*ControlService)

// WithUpdateInterval sets the interval on which the control service will request updates from k2
func WithRequestInterval(interval time.Duration) Option {
return func(c *ControlService) {
c.requestInterval = interval
}
}

// WithGetterSetter sets the key/value getset for control data
func WithGetterSetter(getset agent.GetterSetter) Option {
return func(c *ControlService) {
c.getset = getset
}
}
Loading