From d24fc556df38ffcd5cbca29e9b32b1f92bbac241 Mon Sep 17 00:00:00 2001 From: sebv Date: Fri, 3 Nov 2023 17:48:11 +0800 Subject: [PATCH] adds async events --- internal/cmd/run/imagerunner.go | 13 +- internal/http/imagerunner.go | 36 +++++ internal/imagerunner/async.go | 202 +++++++++++++++++++++++++++++ internal/imagerunner/async_test.go | 79 +++++++++++ internal/saucecloud/imagerunner.go | 56 +++++++- 5 files changed, 379 insertions(+), 7 deletions(-) create mode 100644 internal/imagerunner/async.go create mode 100644 internal/imagerunner/async_test.go diff --git a/internal/cmd/run/imagerunner.go b/internal/cmd/run/imagerunner.go index cf252f02f..08be24b40 100644 --- a/internal/cmd/run/imagerunner.go +++ b/internal/cmd/run/imagerunner.go @@ -69,13 +69,14 @@ func runImageRunner(cmd *cobra.Command) (int, error) { imageRunnerClient := http.NewImageRunner(regio.APIBaseURL(), creds, imgExecTimeout) restoClient := http.NewResto(regio.APIBaseURL(), creds.Username, creds.AccessKey, 0) - r := saucecloud.ImgRunner{ - Project: p, - RunnerService: &imageRunnerClient, - TunnelService: &restoClient, - Reporters: reporters, - Async: gFlags.async, + asyncEventManager, err := imagerunner.NewAsyncEventManager() + if err != nil { + return 1, err } + + r := saucecloud.NewImgRunner(p, &imageRunnerClient, &restoClient, reporters, + asyncEventManager, gFlags.async) + return r.RunProject() } diff --git a/internal/http/imagerunner.go b/internal/http/imagerunner.go index ce5d52cd5..38963069a 100644 --- a/internal/http/imagerunner.go +++ b/internal/http/imagerunner.go @@ -10,7 +10,9 @@ import ( "strings" "time" + "github.com/gorilla/websocket" "github.com/hashicorp/go-retryablehttp" + "github.com/rs/zerolog/log" "github.com/saucelabs/saucectl/internal/iam" "github.com/saucelabs/saucectl/internal/imagerunner" ) @@ -204,6 +206,40 @@ func (c *ImageRunner) GetLogs(ctx context.Context, id string) (string, error) { return c.doGetStr(ctx, urlResponse.URL) } +func (c *ImageRunner) getWebsocketUrl() string { + + wsUrl := c.URL + wsUrl = strings.Replace(wsUrl, "https://", "wss://", 1) + wsUrl = strings.Replace(wsUrl, "http://", "ws://", 1) + return wsUrl +} + +func (c *ImageRunner) OpenAsyncEventsWebsocket(ctx context.Context, id string) (*websocket.Conn, error) { + // dummy request so that we build basic auth header consistently + dummy_url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.URL, id) + req, err := http.NewRequest("GET", dummy_url, nil) + if err != nil { + panic(err) + } + req.SetBasicAuth(c.Creds.Username, c.Creds.AccessKey) + log.Info().Str("c.Creds.Username", c.Creds.Username).Str("c.Creds.AccessKey", c.Creds.AccessKey).Msg("AKAK1 OpenAsyncEventsWebsocket") + + url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.getWebsocketUrl(), id) + headers := http.Header{} + headers.Add("Authorization", req.Header.Get("Authorization")) + ws, resp, err := websocket.DefaultDialer.Dial( + url, headers) + if err != nil { + if resp != nil { + log.Error().Err(err).Int("http status", resp.StatusCode).Msg("Could not open async events websocket") + } else { + log.Error().Err(err).Msg("Could not open async events websocket") + } + return nil, err + } + return ws, nil +} + func (c *ImageRunner) doGetStr(ctx context.Context, url string) (string, error) { urlReq, err := NewRetryableRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { diff --git a/internal/imagerunner/async.go b/internal/imagerunner/async.go new file mode 100644 index 000000000..5aa72aede --- /dev/null +++ b/internal/imagerunner/async.go @@ -0,0 +1,202 @@ +package imagerunner + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/santhosh-tekuri/jsonschema/v5" +) + +var SCHEMA = ` +{ + "properties": { + "kind": { + "enum": [ + "notice", + "log", + "ping" + ] + }, + "runnerID": { + "type": "string" + } + }, + "allOf": [ + { + "if": { + "properties": { + "kind": { + "const": "log" + } + } + }, + "then": { + "properties": { + "lines": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "containerName": { + "type": "string" + }, + "message": { + "type": "string" + } + } + } + } + }, + "additionalProperties": true + } + }, + { + "if": { + "properties": { + "kind": { + "const": "notice" + } + } + }, + "then": { + "properties": { + "severity": { + "enum": [ + "info", + "warning", + "error" + ] + }, + "message": { + "type": "string" + } + }, + "additionalProperties": true + } + }, + { + "if": { + "properties": { + "kind": { + "const": "ping" + } + } + }, + "then": { + "properties": { + "message": { + "type": "string" + } + }, + "additionalProperties": true + } + } + + ], + "additionalProperties": true +} +` + +const ( + NOTICE = "notice" + LOG = "log" + PING = "ping" +) + +type AsyncEventI interface { + GetKind() string + GetRunnerID() string +} + +type AsyncEvent struct { + Kind string `json:"kind"` + RunnerID string `json:"runnerID"` +} + +func (a *AsyncEvent) GetKind() string { + return a.Kind +} + +func (a *AsyncEvent) GetRunnerID() string { + return a.RunnerID +} + +type LogLine struct { + Id string `json:"id"` + ContainerName string `json:"containerName"` + Message string `json:"message"` +} + +type LogEvent struct { + AsyncEvent + Lines []LogLine `json:"lines"` +} + +type PingEvent struct { + AsyncEvent + Message string `json:"message"` +} + +type NoticeEvent struct { + AsyncEvent + Severity string `json:"severity"` + Message string `json:"message"` +} + +type AsyncEventManagerI interface { + ParseEvent(event string) (AsyncEventI, error) +} + +type AsyncEventManager struct { + schema *jsonschema.Schema +} + +func NewAsyncEventManager() (*AsyncEventManager, error) { + schema, err := jsonschema.CompileString("schema.json", SCHEMA) + if err != nil { + return nil, err + } + + asyncEventManager := AsyncEventManager{ + schema: schema, + } + + return &asyncEventManager, nil +} + +func (a *AsyncEventManager) ParseEvent(event string) (AsyncEventI, error) { + err := a.schema.Validate(event) + if err != nil { + return nil, err + } + v := AsyncEvent{} + if err := json.Unmarshal([]byte(event), &v); err != nil { + log.Fatal(err) + } + + if v.GetKind() == LOG { + logEvent := LogEvent{} + if err := json.Unmarshal([]byte(event), &logEvent); err != nil { + log.Fatal(err) + } + return &logEvent, nil + } else if v.GetKind() == NOTICE { + noticeEvent := NoticeEvent{} + if err := json.Unmarshal([]byte(event), ¬iceEvent); err != nil { + log.Fatal(err) + } + return ¬iceEvent, nil + } else if v.GetKind() == PING { + pingEvent := PingEvent{} + if err := json.Unmarshal([]byte(event), &pingEvent); err != nil { + log.Fatal(err) + } + return &pingEvent, nil + } else { + return nil, fmt.Errorf("unknown event type: %s", v.GetKind()) + } +} diff --git a/internal/imagerunner/async_test.go b/internal/imagerunner/async_test.go new file mode 100644 index 000000000..93f3ff20b --- /dev/null +++ b/internal/imagerunner/async_test.go @@ -0,0 +1,79 @@ +package imagerunner + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLogEvent(t *testing.T) { + manager, err := NewAsyncEventManager() + assert.NoError(t, err) + + eventMsg := `{ + "kind": "log", + "runnerID": "myrunner", + "lines": [ + { + "id": "1", + "containerName": "mycontainer", + "message": "hello" + } + ] + }` + + event, err := manager.ParseEvent(eventMsg) + assert.NoError(t, err) + assert.Equal(t, "log", event.GetKind()) + assert.Equal(t, "myrunner", event.GetRunnerID()) + + logEvent, ok := event.(*LogEvent) + assert.True(t, ok) + assert.Len(t, logEvent.Lines, 1) + assert.Equal(t, "1", logEvent.Lines[0].Id) + assert.Equal(t, "hello", logEvent.Lines[0].Message) + assert.Equal(t, "mycontainer", logEvent.Lines[0].ContainerName) +} + +func TestNoticeEvent(t *testing.T) { + manager, err := NewAsyncEventManager() + assert.NoError(t, err) + + eventMsg := `{ + "kind": "notice", + "runnerID": "myrunner", + "severity": "info", + "message": "hello" + }` + + event, err := manager.ParseEvent(eventMsg) + assert.NoError(t, err) + assert.Equal(t, "notice", event.GetKind()) + assert.Equal(t, "myrunner", event.GetRunnerID()) + + noticeEvent, ok := event.(*NoticeEvent) + assert.True(t, ok) + assert.Equal(t, "notice", noticeEvent.GetKind()) + assert.Equal(t, "hello", noticeEvent.Message) + assert.Equal(t, "info", noticeEvent.Severity) +} + +func TestPingEvent(t *testing.T) { + manager, err := NewAsyncEventManager() + assert.NoError(t, err) + + eventMsg := `{ + "kind": "ping", + "runnerID": "myrunner", + "message": "hello" + }` + + event, err := manager.ParseEvent(eventMsg) + assert.NoError(t, err) + assert.Equal(t, "ping", event.GetKind()) + assert.Equal(t, "myrunner", event.GetRunnerID()) + + pingEvent, ok := event.(*PingEvent) + assert.True(t, ok) + assert.Equal(t, "hello", pingEvent.Message) +} diff --git a/internal/saucecloud/imagerunner.go b/internal/saucecloud/imagerunner.go index 7a364f4b8..e11c060c6 100644 --- a/internal/saucecloud/imagerunner.go +++ b/internal/saucecloud/imagerunner.go @@ -13,6 +13,7 @@ import ( "reflect" "time" + "github.com/gorilla/websocket" "github.com/rs/zerolog/log" "github.com/ryanuber/go-glob" szip "github.com/saucelabs/saucectl/internal/archive/zip" @@ -30,6 +31,7 @@ type ImageRunner interface { StopRun(ctx context.Context, id string) error DownloadArtifacts(ctx context.Context, id string) (io.ReadCloser, error) GetLogs(ctx context.Context, id string) (string, error) + OpenAsyncEventsWebsocket(ctx context.Context, id string) (*websocket.Conn, error) } type SuiteTimeoutError struct { @@ -49,12 +51,25 @@ type ImgRunner struct { Reporters []report.Reporter - Async bool + asyncEventManager imagerunner.AsyncEventManagerI + Async bool ctx context.Context cancel context.CancelFunc } +func NewImgRunner(project imagerunner.Project, runnerService ImageRunner, tunnelService tunnel.Service, + reporters []report.Reporter, asyncEventManager imagerunner.AsyncEventManagerI, async bool) *ImgRunner { + return &ImgRunner{ + Project: project, + RunnerService: runnerService, + TunnelService: tunnelService, + Reporters: reporters, + asyncEventManager: asyncEventManager, + Async: async, + } +} + type execResult struct { name string runID string @@ -269,6 +284,12 @@ func (r *ImgRunner) runSuite(suite imagerunner.Suite) (imagerunner.Runner, error return runner, nil } + go func() { + err := r.HandleAsyncEvents(ctx, runner.ID) + // TODO: handle error better + log.Err(err).Msg("Async event handler failed.") + }() + var run imagerunner.Runner run, err = r.PollRun(ctx, runner.ID, runner.Status) if errors.Is(err, context.DeadlineExceeded) && ctx.Err() != nil { @@ -399,6 +420,39 @@ func (r *ImgRunner) PollRun(ctx context.Context, id string, lastStatus string) ( } } +func (r *ImgRunner) HandleAsyncEvents(ctx context.Context, id string) error { + conn, err := r.RunnerService.OpenAsyncEventsWebsocket(ctx, id) + if err != nil { + return err + } + defer conn.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + _, msg, err := conn.ReadMessage() + if err != nil { + return err + } + event, err := r.asyncEventManager.ParseEvent(string(msg)) + if err != nil { + return err + } + if event.GetKind() == "log" { + logEvent := event.(*imagerunner.LogEvent) + for _, line := range logEvent.Lines { + log.Info().Msgf("[%s, %s] %s", line.ContainerName, line.Id, line.Message) + } + } else if event.GetKind() == "notice" { + noticeEvent := event.(*imagerunner.NoticeEvent) + log.Info().Msgf("[%s] %s", noticeEvent.Severity, noticeEvent.Message) + } + } + } +} + // DownloadArtifact downloads a zipped archive of artifacts and extracts the required files. func (r *ImgRunner) DownloadArtifacts(runnerID, suiteName, status string, passed bool) []string { if r.Async ||