Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote restart consumer to handle remote restart actions #1948

Merged
merged 9 commits into from
Nov 12, 2024
5 changes: 5 additions & 0 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/kolide/launcher/ee/control/consumers/flareconsumer"
"github.com/kolide/launcher/ee/control/consumers/keyvalueconsumer"
"github.com/kolide/launcher/ee/control/consumers/notificationconsumer"
"github.com/kolide/launcher/ee/control/consumers/remoterestartconsumer"
"github.com/kolide/launcher/ee/control/consumers/uninstallconsumer"
"github.com/kolide/launcher/ee/debug/checkups"
desktopRunner "github.com/kolide/launcher/ee/desktop/runner"
Expand Down Expand Up @@ -469,6 +470,10 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
// register notifications consumer
actionsQueue.RegisterActor(notificationconsumer.NotificationSubsystem, notificationConsumer)

remoteRestartConsumer := remoterestartconsumer.New(k)
runGroup.Add("remoteRestart", remoteRestartConsumer.Execute, remoteRestartConsumer.Interrupt)
actionsQueue.RegisterActor(remoterestartconsumer.RemoteRestartActorType, remoteRestartConsumer)

// Set up our tracing instrumentation
authTokenConsumer := keyvalueconsumer.New(k.TokenStore())
if err := controlService.RegisterConsumer(authTokensSubsystemName, authTokenConsumer); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/launcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kolide/kit/env"
"github.com/kolide/kit/logutil"
"github.com/kolide/kit/version"
"github.com/kolide/launcher/ee/control/consumers/remoterestartconsumer"
"github.com/kolide/launcher/ee/tuf"
"github.com/kolide/launcher/ee/watchdog"
"github.com/kolide/launcher/pkg/contexts/ctxlog"
Expand Down Expand Up @@ -153,11 +154,11 @@ func runMain() int {
ctx = ctxlog.NewContext(ctx, logger)

if err := runLauncher(ctx, cancel, slogger, systemSlogger, opts); err != nil {
if !tuf.IsLauncherReloadNeededErr(err) {
if !tuf.IsLauncherReloadNeededErr(err) && !remoterestartconsumer.IsRemoteRestartRequestedErr(err) {
level.Debug(logger).Log("msg", "run launcher", "stack", fmt.Sprintf("%+v", err))
return 1
}
level.Debug(logger).Log("msg", "runLauncher exited to run newer version of launcher", "err", err.Error())
level.Debug(logger).Log("msg", "runLauncher exited to reload launcher", "err", err.Error())
if err := runNewerLauncherIfAvailable(ctx, slogger.Logger); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works on macOS/Linux unless there is nothing to be found in the autoupdate library, in which case launcher will exit and we would have to rely on the init system (launchd/systemd) to restart launcher. (On Windows, we always exit completely and rely on the Service Manager to restart launcher.)

This seems like an okay tradeoff to me, since we will be doing this action only rarely as a troubleshooting step, but I wanted to flag it in case others disagree.

return 1
}
Expand Down
35 changes: 35 additions & 0 deletions ee/control/consumers/remoterestartconsumer/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package remoterestartconsumer

import (
"errors"
"fmt"
)

// ErrRemoteRestartRequested is returned to the main launcher rungroup when
// a remote restart has been requested.
type ErrRemoteRestartRequested struct {
msg string
runId string
}

func NewRemoteRestartRequestedErr(runId string) ErrRemoteRestartRequested {
return ErrRemoteRestartRequested{
msg: "need to reload launcher: remote restart requested",
runId: runId,
}
}

func (e ErrRemoteRestartRequested) Error() string {
return fmt.Sprintf("%s (run_id %s)", e.msg, e.runId)
}

func (e ErrRemoteRestartRequested) Is(target error) bool {
if _, ok := target.(ErrRemoteRestartRequested); ok {
return true
}
return false
}

func IsRemoteRestartRequestedErr(err error) bool {
return errors.Is(err, ErrRemoteRestartRequested{})
}
125 changes: 125 additions & 0 deletions ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package remoterestartconsumer

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"time"

"github.com/kolide/launcher/ee/agent/types"
)

const (
// RemoteRestartActorType identifies this action/actor type, which performs
// a launcher restart when requested by the control server. This actor type
// belongs to the action subsystem.
RemoteRestartActorType = "remote_restart"

// restartDelay is the delay after receiving action before triggering the restart.
// We have a delay to allow the actionqueue.
restartDelay = 15 * time.Second
)

type RemoteRestartConsumer struct {
knapsack types.Knapsack
slogger *slog.Logger
signalRestart chan error
interrupt chan struct{}
interrupted bool
}

type remoteRestartAction struct {
RunID string `json:"run_id"` // the run ID for the launcher run to restart
}

func New(knapsack types.Knapsack) *RemoteRestartConsumer {
return &RemoteRestartConsumer{
knapsack: knapsack,
slogger: knapsack.Slogger().With("component", "remote_restart_consumer"),
signalRestart: make(chan error, 1),
interrupt: make(chan struct{}, 1),
}
}

// Do implements the `actionqueue.actor` interface, and allows the actionqueue
// to pass `remote_restart` type actions to this consumer. The actionqueue validates
// that this action has not already been performed and that this action is still
// valid (i.e. not expired). `Do` additionally validates that the `run_id` given in
// the action matches the current launcher run ID.
func (r *RemoteRestartConsumer) Do(data io.Reader) error {
var restartAction remoteRestartAction

if err := json.NewDecoder(data).Decode(&restartAction); err != nil {
return fmt.Errorf("decoding restart action: %w", err)
}

// The action's run ID indicates the current `runLauncher` that should be restarted.
// If the action's run ID does not match the current run ID, we assume the restart
// has already happened and does not need to happen again.
if restartAction.RunID == "" {
r.slogger.Log(context.TODO(), slog.LevelInfo,
"received remote restart action with blank launcher run ID -- discarding",
)
return nil
}
if restartAction.RunID != r.knapsack.GetRunID() {
r.slogger.Log(context.TODO(), slog.LevelInfo,
"received remote restart action for incorrect (assuming past) launcher run ID -- discarding",
"action_run_id", restartAction.RunID,
)
return nil
}

// Perform the restart by signaling actor shutdown, but delay slightly to give
// the actionqueue a chance to process all actions and store their statuses.
go func() {
r.slogger.Log(context.TODO(), slog.LevelInfo,
"received remote restart action for current launcher run ID -- signaling for restart shortly",
"action_run_id", restartAction.RunID,
"restart_delay", restartDelay.String(),
)

select {
case <-r.interrupt:
r.slogger.Log(context.TODO(), slog.LevelDebug,
"received external interrupt before remote restart could be performed",
)
return
case <-time.After(restartDelay):
r.signalRestart <- NewRemoteRestartRequestedErr(restartAction.RunID)
r.slogger.Log(context.TODO(), slog.LevelInfo,
"signaled for restart after delay",
"action_run_id", restartAction.RunID,
)
return
}
}()

return nil
}

// Execute allows the remote restart consumer to run in the main launcher rungroup.
// It waits until it receives a remote restart action from `Do`, or until it receives
// a `Interrupt` request.
func (r *RemoteRestartConsumer) Execute() (err error) {
select {
case <-r.interrupt:
return nil
case signalRestartErr := <-r.signalRestart:
return signalRestartErr
}
}

// Interrupt allows the remote restart consumer to run in the main launcher rungroup
// and be shut down when the rungroup shuts down.
func (r *RemoteRestartConsumer) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if r.interrupted {
return
}
r.interrupted = true

r.interrupt <- struct{}{}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package remoterestartconsumer

import (
"bytes"
"encoding/json"
"errors"
"testing"
"time"

"github.com/kolide/kit/ulid"
typesmocks "github.com/kolide/launcher/ee/agent/types/mocks"
"github.com/kolide/launcher/pkg/log/multislogger"
"github.com/stretchr/testify/require"
)

func TestDo(t *testing.T) {
t.Parallel()

currentRunId := ulid.New()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())
mockKnapsack.On("GetRunID").Return(currentRunId)

remoteRestarter := New(mockKnapsack)

testAction := remoteRestartAction{
RunID: currentRunId,
}
testActionRaw, err := json.Marshal(testAction)
require.NoError(t, err)

// We don't expect an error because we should process the action
require.NoError(t, remoteRestarter.Do(bytes.NewReader(testActionRaw)), "expected no error processing valid remote restart action")

// The restarter should delay before sending an error to `signalRestart`
require.Len(t, remoteRestarter.signalRestart, 0, "expected restarter to delay before signal for restart but channel is already has item in it")
time.Sleep(restartDelay + 2*time.Second)
require.Len(t, remoteRestarter.signalRestart, 1, "expected restarter to signal for restart but channel is empty after delay")
}

func TestDo_DoesNotSignalRestartWhenRunIDDoesNotMatch(t *testing.T) {
t.Parallel()

currentRunId := ulid.New()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())
mockKnapsack.On("GetRunID").Return(currentRunId)

remoteRestarter := New(mockKnapsack)

testAction := remoteRestartAction{
RunID: ulid.New(), // run ID will not match `currentRunId`
}
testActionRaw, err := json.Marshal(testAction)
require.NoError(t, err)

// We don't expect an error because we want to discard this action
require.NoError(t, remoteRestarter.Do(bytes.NewReader(testActionRaw)), "should not return error for old run ID")

// The restarter should not send an error to `signalRestart`
time.Sleep(restartDelay + 2*time.Second)
require.Len(t, remoteRestarter.signalRestart, 0, "restarter should not have signaled for a restart, but channel is not empty")
}

func TestDo_DoesNotSignalRestartWhenRunIDIsEmpty(t *testing.T) {
t.Parallel()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())

remoteRestarter := New(mockKnapsack)

testAction := remoteRestartAction{
RunID: "", // run ID is empty
}
testActionRaw, err := json.Marshal(testAction)
require.NoError(t, err)

// We don't expect an error because we want to discard this action
require.NoError(t, remoteRestarter.Do(bytes.NewReader(testActionRaw)), "should not return error for empty run ID")

// The restarter should not send an error to `signalRestart`
time.Sleep(restartDelay + 2*time.Second)
require.Len(t, remoteRestarter.signalRestart, 0, "restarter should not have signaled for a restart, but channel is not empty")
}

func TestDo_DoesNotRestartIfInterruptedDuringDelay(t *testing.T) {
t.Parallel()

currentRunId := ulid.New()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())
mockKnapsack.On("GetRunID").Return(currentRunId)

remoteRestarter := New(mockKnapsack)

testAction := remoteRestartAction{
RunID: currentRunId,
}
testActionRaw, err := json.Marshal(testAction)
require.NoError(t, err)

// We don't expect an error because the run ID is correct
require.NoError(t, remoteRestarter.Do(bytes.NewReader(testActionRaw)), "expected no error processing valid remote restart action")

// The restarter should delay before sending an error to `signalRestart`
require.Len(t, remoteRestarter.signalRestart, 0, "expected restarter to delay before signal for restart but channel is already has item in it")

// Now, send an interrupt
remoteRestarter.Interrupt(errors.New("test error"))

// Sleep beyond the interrupt delay, and confirm we don't try to do a restart when we're already shutting down
time.Sleep(restartDelay + 2*time.Second)
require.Len(t, remoteRestarter.signalRestart, 0, "restarter should not have tried to signal for restart when interrupted during restart delay")
}

func TestInterrupt_Multiple(t *testing.T) {
t.Parallel()

mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())

remoteRestarter := New(mockKnapsack)

// Let the remote restarter run for a bit
go remoteRestarter.Execute()
time.Sleep(3 * time.Second)
remoteRestarter.Interrupt(errors.New("test error"))

// Confirm we can call Interrupt multiple times without blocking
interruptComplete := make(chan struct{})
expectedInterrupts := 3
for i := 0; i < expectedInterrupts; i += 1 {
go func() {
remoteRestarter.Interrupt(nil)
interruptComplete <- struct{}{}
}()
}

receivedInterrupts := 0
for {
if receivedInterrupts >= expectedInterrupts {
break
}

select {
case <-interruptComplete:
receivedInterrupts += 1
continue
case <-time.After(5 * time.Second):
t.Errorf("could not call interrupt multiple times and return within 5 seconds -- received %d interrupts before timeout", receivedInterrupts)
t.FailNow()
}
}

require.Equal(t, expectedInterrupts, receivedInterrupts)
}
Loading