Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ee/control and sub-packages to use new slogger #1528

Merged
merged 5 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions cmd/launcher/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package main
import (
"context"
"fmt"
"log/slog"
"net/http"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/ee/control"
)

func createHTTPClient(ctx context.Context, logger log.Logger, k types.Knapsack) (*control.HTTPClient, error) {
level.Debug(logger).Log("msg", "creating control http client")
func createHTTPClient(ctx context.Context, k types.Knapsack) (*control.HTTPClient, error) {
k.Slogger().Log(context.TODO(), slog.LevelDebug,
"creating control http client",
)

clientOpts := []control.HTTPClientOption{}
if k.InsecureControlTLS() {
Expand All @@ -21,26 +22,28 @@ func createHTTPClient(ctx context.Context, logger log.Logger, k types.Knapsack)
if k.DisableControlTLS() {
clientOpts = append(clientOpts, control.WithDisableTLS())
}
client, err := control.NewControlHTTPClient(logger, k.ControlServerURL(), http.DefaultClient, clientOpts...)
client, err := control.NewControlHTTPClient(k.ControlServerURL(), http.DefaultClient, clientOpts...)
if err != nil {
return nil, fmt.Errorf("creating control http client: %w", err)
}

return client, nil
}

func createControlService(ctx context.Context, logger log.Logger, store types.GetterSetter, k types.Knapsack) (*control.ControlService, error) {
level.Debug(logger).Log("msg", "creating control service")
func createControlService(ctx context.Context, store types.GetterSetter, k types.Knapsack) (*control.ControlService, error) {
k.Slogger().Log(context.TODO(), slog.LevelDebug,
"creating control service",
)

client, err := createHTTPClient(ctx, logger, k)
client, err := createHTTPClient(ctx, k)
if err != nil {
return nil, err
}

controlOpts := []control.Option{
control.WithStore(k.ControlStore()),
}
service := control.New(logger, k, client, controlOpts...)
service := control.New(k, client, controlOpts...)

return service, nil
}
2 changes: 1 addition & 1 deletion cmd/launcher/flare.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func runFlare(args []string) error {

logger := log.NewLogfmtLogger(os.Stdout)
fcOpts := []flags.Option{flags.WithCmdLineOpts(opts)}
flagController := flags.NewFlagController(logger, inmemory.NewStore(logger), fcOpts...)
flagController := flags.NewFlagController(logger, inmemory.NewStore(), fcOpts...)

k := knapsack.New(nil, flagController, nil, nil, nil)
ctx := context.Background()
Expand Down
6 changes: 3 additions & 3 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func runLauncher(ctx context.Context, cancel func(), slogger, systemSlogger *mul
if k.ControlServerURL() == "" {
level.Debug(logger).Log("msg", "control server URL not set, will not create control service")
} else {
controlService, err := createControlService(ctx, logger, k.ControlStore(), k)
controlService, err := createControlService(ctx, k.ControlStore(), k)
if err != nil {
return fmt.Errorf("failed to setup control service: %w", err)
}
Expand All @@ -346,8 +346,8 @@ func runLauncher(ctx context.Context, cancel func(), slogger, systemSlogger *mul

// create an action queue for all other action style commands
actionsQueue := actionqueue.New(
k,
actionqueue.WithContext(ctx),
actionqueue.WithLogger(logger),
actionqueue.WithStore(k.ControlServerActionsStore()),
actionqueue.WithOldNotificationsStore(k.SentNotificationsStore()),
)
Expand All @@ -363,8 +363,8 @@ func runLauncher(ctx context.Context, cancel func(), slogger, systemSlogger *mul
// create notification consumer
notificationConsumer, err := notificationconsumer.NewNotifyConsumer(
ctx,
k,
runner,
notificationconsumer.WithLogger(logger),
)
if err != nil {
return fmt.Errorf("failed to set up notifier: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion ee/agent/storage/ci/keyvalue_store_ci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (

func NewStore(t *testing.T, logger log.Logger, bucketName string) (types.KVStore, error) {
if os.Getenv("CI") == "true" {
return inmemory.NewStore(logger), nil
return inmemory.NewStore(), nil
}

return agentbbolt.NewStore(logger, SetupDB(t), bucketName)
Expand Down
2 changes: 1 addition & 1 deletion ee/agent/storage/ci/keyvalue_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func getStores(t *testing.T) []types.KVStore {
require.NoError(t, err)

stores := []types.KVStore{
inmemory.NewStore(logger),
inmemory.NewStore(),
bboltStore,
}
return stores
Expand Down
6 changes: 3 additions & 3 deletions ee/agent/storage/ci/stores_ci.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ func MakeStores(t *testing.T, logger log.Logger, db *bbolt.DB) (map[storage.Stor
}

if os.Getenv("CI") == "true" {
return makeInMemoryStores(t, logger, storeNames), nil
return makeInMemoryStores(t, storeNames), nil
}

return makeBboltStores(t, logger, db, storeNames)
}

func makeInMemoryStores(t *testing.T, logger log.Logger, storeNames []storage.Store) map[storage.Store]types.KVStore {
func makeInMemoryStores(t *testing.T, storeNames []storage.Store) map[storage.Store]types.KVStore {
stores := make(map[storage.Store]types.KVStore)

for _, storeName := range storeNames {
stores[storeName] = inmemory.NewStore(logger)
stores[storeName] = inmemory.NewStore()
}

return stores
Expand Down
12 changes: 4 additions & 8 deletions ee/agent/storage/inmemory/keyvalue_store_in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@ package inmemory
import (
"errors"
"sync"

"github.com/go-kit/kit/log"
)

type inMemoryKeyValueStore struct {
logger log.Logger
mu sync.RWMutex
items map[string][]byte
mu sync.RWMutex
items map[string][]byte
}

func NewStore(logger log.Logger) *inMemoryKeyValueStore {
func NewStore() *inMemoryKeyValueStore {
s := &inMemoryKeyValueStore{
logger: logger,
items: make(map[string][]byte),
items: make(map[string][]byte),
}

return s
Expand Down
82 changes: 56 additions & 26 deletions ee/control/actionqueue/actionqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/kolide/launcher/ee/agent/storage/inmemory"
"github.com/kolide/launcher/ee/agent/types"
)
Expand All @@ -34,24 +33,22 @@ type action struct {
ProcessedAt time.Time `json:"processed_at,omitempty"`
}

func (a action) String() string {
return fmt.Sprintf("ID: %s; type: %s; valid until: %d", a.ID, a.Type, a.ValidUntil)
}

type actionqueue struct {
ctx context.Context // nolint:containedctx
actors map[string]actor
store types.KVStore
oldNotificationsStore types.KVStore
logger log.Logger
slogger *slog.Logger
actionCleanupInterval time.Duration
cancel context.CancelFunc
}

type actionqueueOption func(*actionqueue)

func WithLogger(logger log.Logger) actionqueueOption {
return func(aq *actionqueue) {
aq.logger = logger
}
}

func WithStore(store types.KVStore) actionqueueOption {
return func(aq *actionqueue) {
aq.store = store
Expand All @@ -76,24 +73,22 @@ func WithContext(ctx context.Context) actionqueueOption {
}
}

func New(opts ...actionqueueOption) *actionqueue {
func New(k types.Knapsack, opts ...actionqueueOption) *actionqueue {
aq := &actionqueue{
ctx: context.Background(),
actors: make(map[string]actor, 0),
actionCleanupInterval: defaultCleanupInterval,
logger: log.NewNopLogger(),
slogger: k.Slogger().With("component", "actionqueue"),
}

for _, opt := range opts {
opt(aq)
}

if aq.store == nil {
aq.store = inmemory.NewStore(aq.logger)
aq.store = inmemory.NewStore()
}

aq.logger = log.With(aq.logger, "component", "actionqueue")

return aq
}

Expand All @@ -108,7 +103,10 @@ func (aq *actionqueue) Update(data io.Reader) error {
for _, rawAction := range rawActionsToProcess {
var action action
if err := json.Unmarshal(rawAction, &action); err != nil {
level.Debug(aq.logger).Log("msg", "received action in unexpected format from K2, discarding", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelWarn,
"received action in unexpected format from K2, discarding",
"err", err,
)
continue
}

Expand All @@ -118,12 +116,18 @@ func (aq *actionqueue) Update(data io.Reader) error {

actor, err := aq.actorForAction(action)
if err != nil {
level.Info(aq.logger).Log("msg", "getting actor for action", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelInfo,
"getting actor for action",
"err", err,
)
continue
}

if err := actor.Do(bytes.NewReader(rawAction)); err != nil {
level.Info(aq.logger).Log("msg", "failed to do action with action, not marking action complete", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelInfo,
"failed to do action with action, not marking action complete",
"err", err,
)
continue
}

Expand Down Expand Up @@ -154,7 +158,9 @@ func (aq *actionqueue) runCleanup() {
for {
select {
case <-ctx.Done():
level.Debug(aq.logger).Log("msg", "action cleanup stopped due to context cancel")
aq.slogger.Log(context.TODO(), slog.LevelDebug,
"action cleanup stopped due to context cancel",
)
return
case <-t.C:
aq.cleanupActions()
Expand All @@ -169,19 +175,28 @@ func (aq *actionqueue) StopCleanup(err error) {
func (aq *actionqueue) storeActionRecord(actionToStore action) {
rawAction, err := json.Marshal(actionToStore)
if err != nil {
level.Error(aq.logger).Log("msg", "could not marshal complete action", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelError,
"could not marshal complete action",
"err", err,
)
return
}

if err := aq.store.Set([]byte(actionToStore.ID), rawAction); err != nil {
level.Debug(aq.logger).Log("msg", "could not mark action complete", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelWarn,
"could not mark action complete",
"err", err,
)
}
}

func (aq *actionqueue) isActionNew(id string) bool {
completedActionRaw, err := aq.store.Get([]byte(id))
if err != nil {
level.Error(aq.logger).Log("msg", "could not read action from bucket", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelError,
"could not read action from bucket",
"err", err,
)
return false
}

Expand All @@ -206,7 +221,10 @@ func (aq *actionqueue) isActionNew(id string) bool {

completedActionRaw, err = aq.oldNotificationsStore.Get([]byte(id))
if err != nil {
level.Error(aq.logger).Log("msg", "could not read action from old notifications store", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelError,
"could not read action from old notifications store",
"err", err,
)
return false
}

Expand All @@ -216,12 +234,18 @@ func (aq *actionqueue) isActionNew(id string) bool {

func (aq *actionqueue) isActionValid(a action) bool {
if a.ID == "" {
level.Info(aq.logger).Log("msg", "action ID is empty", "action", a)
aq.slogger.Log(context.TODO(), slog.LevelWarn,
"action ID is empty",
"action", a.String(),
)
return false
}

if a.ValidUntil <= 0 {
level.Info(aq.logger).Log("msg", "action valid until is empty", "action", a)
aq.slogger.Log(context.TODO(), slog.LevelWarn,
"action valid until is empty",
"action", a.String(),
)
return false
}

Expand Down Expand Up @@ -261,11 +285,17 @@ func (aq *actionqueue) cleanupActions() {

return nil
}); err != nil {
level.Debug(aq.logger).Log("msg", "could not iterate over bucket items to determine which are expired", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelWarn,
"could not iterate over bucket items to determine which are expired",
"err", err,
)
}

// Delete all old keys
if err := aq.store.Delete(keysToDelete...); err != nil {
level.Debug(aq.logger).Log("msg", "could not delete old actions from bucket", "err", err)
aq.slogger.Log(context.TODO(), slog.LevelWarn,
"could not delete old actions from bucket",
"err", err,
)
}
}
Loading
Loading