Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote restart consumer to handle remote restart actions #1948

Merged
merged 9 commits into from
Nov 12, 2024
5 changes: 5 additions & 0 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/kolide/launcher/ee/control/consumers/flareconsumer"
"github.com/kolide/launcher/ee/control/consumers/keyvalueconsumer"
"github.com/kolide/launcher/ee/control/consumers/notificationconsumer"
"github.com/kolide/launcher/ee/control/consumers/remoterestartconsumer"
"github.com/kolide/launcher/ee/control/consumers/uninstallconsumer"
"github.com/kolide/launcher/ee/debug/checkups"
desktopRunner "github.com/kolide/launcher/ee/desktop/runner"
Expand Down Expand Up @@ -469,6 +470,10 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
// register notifications consumer
actionsQueue.RegisterActor(notificationconsumer.NotificationSubsystem, notificationConsumer)

remoteRestartConsumer := remoterestartconsumer.New(k)
runGroup.Add("remoteRestart", remoteRestartConsumer.Execute, remoteRestartConsumer.Shutdown)
actionsQueue.RegisterActor(remoterestartconsumer.RemoteRestartActorType, remoteRestartConsumer)

// Set up our tracing instrumentation
authTokenConsumer := keyvalueconsumer.New(k.TokenStore())
if err := controlService.RegisterConsumer(authTokensSubsystemName, authTokenConsumer); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/launcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kolide/kit/env"
"github.com/kolide/kit/logutil"
"github.com/kolide/kit/version"
"github.com/kolide/launcher/ee/control/consumers/remoterestartconsumer"
"github.com/kolide/launcher/ee/tuf"
"github.com/kolide/launcher/ee/watchdog"
"github.com/kolide/launcher/pkg/contexts/ctxlog"
Expand Down Expand Up @@ -153,11 +154,11 @@ func runMain() int {
ctx = ctxlog.NewContext(ctx, logger)

if err := runLauncher(ctx, cancel, slogger, systemSlogger, opts); err != nil {
if !tuf.IsLauncherReloadNeededErr(err) {
if !tuf.IsLauncherReloadNeededErr(err) && !remoterestartconsumer.IsRemoteRestartRequestedErr(err) {
level.Debug(logger).Log("msg", "run launcher", "stack", fmt.Sprintf("%+v", err))
return 1
}
level.Debug(logger).Log("msg", "runLauncher exited to run newer version of launcher", "err", err.Error())
level.Debug(logger).Log("msg", "runLauncher exited to reload launcher", "err", err.Error())
if err := runNewerLauncherIfAvailable(ctx, slogger.Logger); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works on macOS/Linux unless there is nothing to be found in the autoupdate library, in which case launcher will exit and we would have to rely on the init system (launchd/systemd) to restart launcher. (On Windows, we always exit completely and rely on the Service Manager to restart launcher.)

This seems like an okay tradeoff to me, since we will be doing this action only rarely as a troubleshooting step, but I wanted to flag it in case others disagree.

return 1
}
Expand Down
30 changes: 30 additions & 0 deletions ee/control/consumers/remoterestartconsumer/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package remoterestartconsumer

import (
"errors"
)

type RemoteRestartRequested struct {
msg string
}

func NewRemoteRestartRequested() RemoteRestartRequested {
return RemoteRestartRequested{
msg: "need to reload launcher: remote restart requested",
}
}

func (e RemoteRestartRequested) Error() string {
return e.msg
}

func (e RemoteRestartRequested) Is(target error) bool {
if _, ok := target.(RemoteRestartRequested); ok {
return true
}
return false
}

func IsRemoteRestartRequestedErr(err error) bool {
return errors.Is(err, RemoteRestartRequested{})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package remoterestartconsumer

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"time"

"github.com/kolide/launcher/ee/agent/types"
)

const (
// Identifier for this consumer.
RemoteRestartActorType = "remote_restart"

restartDelay = 15 * time.Second
)

type RemoteRestartConsumer struct {
knapsack types.Knapsack
slogger *slog.Logger
signalRestart chan error
interrupt chan struct{}
interrupted bool
}

type remoteRestartAction struct {
RunID string `json:"run_id"` // the run ID for the launcher run to restart
}

func New(knapsack types.Knapsack) *RemoteRestartConsumer {
return &RemoteRestartConsumer{
knapsack: knapsack,
slogger: knapsack.Slogger().With("component", "remote_restart_consumer"),
signalRestart: make(chan error, 1),
interrupt: make(chan struct{}, 1),
}
}

func (r *RemoteRestartConsumer) Do(data io.Reader) error {
var restartAction remoteRestartAction

if err := json.NewDecoder(data).Decode(&restartAction); err != nil {
return fmt.Errorf("decoding restart action: %w", err)
}

// The action's run ID indicates the current `runLauncher` that should be restarted.
// If the action's run ID does not match the current run ID, we assume the restart
// has already happened and does not need to happen again.
if restartAction.RunID != r.knapsack.GetRunID() {
r.slogger.Log(context.TODO(), slog.LevelInfo,
"received remote restart action for incorrect (assuming past) launcher run ID -- discarding",
"run_id", restartAction.RunID,
)
return nil
}

// Perform the restart by signaling actor shutdown, but delay slightly to give
// the actionqueue a chance to process all actions and store their statuses.
go func() {
r.slogger.Log(context.TODO(), slog.LevelInfo,
"received remote restart action for current launcher run ID -- signaling for restart shortly",
"run_id", restartAction.RunID,
"restart_delay", restartDelay.String(),
)

time.Sleep(restartDelay)

r.signalRestart <- NewRemoteRestartRequested()
}()

return nil
}

func (r *RemoteRestartConsumer) Execute() (err error) {
// Wait until we receive a remote restart action, or until we receive a Shutdown request
select {
case <-r.interrupt:
return nil
case signalRestartErr := <-r.signalRestart:
return signalRestartErr
}
}

func (r *RemoteRestartConsumer) Shutdown(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if r.interrupted {
return
}
r.interrupted = true

r.interrupt <- struct{}{}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package remoterestartconsumer

import (
"bytes"
"encoding/json"
"testing"
"time"

"github.com/kolide/kit/ulid"
typesmocks "github.com/kolide/launcher/ee/agent/types/mocks"
"github.com/kolide/launcher/pkg/log/multislogger"
"github.com/stretchr/testify/require"
)

func TestDo(t *testing.T) {
t.Parallel()

currentRunId := ulid.New()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())
mockKnapsack.On("GetRunID").Return(currentRunId)

remoteRestarter := New(mockKnapsack)

testAction := remoteRestartAction{
RunID: currentRunId,
}
testActionRaw, err := json.Marshal(testAction)
require.NoError(t, err)

// We don't expect an error because we should process the action
require.NoError(t, remoteRestarter.Do(bytes.NewReader(testActionRaw)), "expected no error processing valid remote restart action")

// The restarter should delay before sending an error to `signalRestart`
require.Len(t, remoteRestarter.signalRestart, 0, "expected restarter to delay before signal for restart but channel is already has item in it")
time.Sleep(restartDelay + 2*time.Second)
require.Len(t, remoteRestarter.signalRestart, 1, "expected restarter to signal for restart but channel is empty after delay")
}

func TestDo_DoesNotSignalRestartWhenRunIDDoesNotMatch(t *testing.T) {
t.Parallel()

currentRunId := ulid.New()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())
mockKnapsack.On("GetRunID").Return(currentRunId)

remoteRestarter := New(mockKnapsack)

testAction := remoteRestartAction{
RunID: ulid.New(), // run ID will not match `currentRunId`
}
testActionRaw, err := json.Marshal(testAction)
require.NoError(t, err)

// We don't expect an error because we want to discard this action
require.NoError(t, remoteRestarter.Do(bytes.NewReader(testActionRaw)), "should not return error for old run ID")

// The restarter should not send an error to `signalRestart`
time.Sleep(restartDelay + 2*time.Second)
require.Len(t, remoteRestarter.signalRestart, 0, "restarter should not have signaled for a restart, but channel is not empty")
}

func TestDo_DoesNotSignalRestartWhenRunIDIsEmpty(t *testing.T) {
t.Parallel()

currentRunId := ulid.New()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())
mockKnapsack.On("GetRunID").Return(currentRunId)

remoteRestarter := New(mockKnapsack)

testAction := remoteRestartAction{
RunID: "", // run ID will not match `currentRunId`
}
testActionRaw, err := json.Marshal(testAction)
require.NoError(t, err)

// We don't expect an error because we want to discard this action
require.NoError(t, remoteRestarter.Do(bytes.NewReader(testActionRaw)), "should not return error for empty run ID")

// The restarter should not send an error to `signalRestart`
time.Sleep(restartDelay + 2*time.Second)
require.Len(t, remoteRestarter.signalRestart, 0, "restarter should not have signaled for a restart, but channel is not empty")
}
Loading