Skip to content

Commit

Permalink
adds async events
Browse files Browse the repository at this point in the history
  • Loading branch information
sebv committed Dec 5, 2023
1 parent b5c3b6f commit d24fc55
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 7 deletions.
13 changes: 7 additions & 6 deletions internal/cmd/run/imagerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ func runImageRunner(cmd *cobra.Command) (int, error) {
imageRunnerClient := http.NewImageRunner(regio.APIBaseURL(), creds, imgExecTimeout)
restoClient := http.NewResto(regio.APIBaseURL(), creds.Username, creds.AccessKey, 0)

r := saucecloud.ImgRunner{
Project: p,
RunnerService: &imageRunnerClient,
TunnelService: &restoClient,
Reporters: reporters,
Async: gFlags.async,
asyncEventManager, err := imagerunner.NewAsyncEventManager()
if err != nil {
return 1, err
}

r := saucecloud.NewImgRunner(p, &imageRunnerClient, &restoClient, reporters,
asyncEventManager, gFlags.async)

return r.RunProject()
}

Expand Down
36 changes: 36 additions & 0 deletions internal/http/imagerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"strings"
"time"

"github.com/gorilla/websocket"
"github.com/hashicorp/go-retryablehttp"
"github.com/rs/zerolog/log"
"github.com/saucelabs/saucectl/internal/iam"
"github.com/saucelabs/saucectl/internal/imagerunner"
)
Expand Down Expand Up @@ -204,6 +206,40 @@ func (c *ImageRunner) GetLogs(ctx context.Context, id string) (string, error) {
return c.doGetStr(ctx, urlResponse.URL)
}

func (c *ImageRunner) getWebsocketUrl() string {

wsUrl := c.URL
wsUrl = strings.Replace(wsUrl, "https://", "wss://", 1)
wsUrl = strings.Replace(wsUrl, "http://", "ws://", 1)
return wsUrl
}

func (c *ImageRunner) OpenAsyncEventsWebsocket(ctx context.Context, id string) (*websocket.Conn, error) {
// dummy request so that we build basic auth header consistently
dummy_url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.URL, id)
req, err := http.NewRequest("GET", dummy_url, nil)
if err != nil {
panic(err)
}
req.SetBasicAuth(c.Creds.Username, c.Creds.AccessKey)
log.Info().Str("c.Creds.Username", c.Creds.Username).Str("c.Creds.AccessKey", c.Creds.AccessKey).Msg("AKAK1 OpenAsyncEventsWebsocket")

url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.getWebsocketUrl(), id)
headers := http.Header{}
headers.Add("Authorization", req.Header.Get("Authorization"))
ws, resp, err := websocket.DefaultDialer.Dial(
url, headers)
if err != nil {
if resp != nil {
log.Error().Err(err).Int("http status", resp.StatusCode).Msg("Could not open async events websocket")
} else {
log.Error().Err(err).Msg("Could not open async events websocket")
}
return nil, err
}
return ws, nil
}

func (c *ImageRunner) doGetStr(ctx context.Context, url string) (string, error) {
urlReq, err := NewRetryableRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand Down
202 changes: 202 additions & 0 deletions internal/imagerunner/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package imagerunner

import (
"encoding/json"
"fmt"
"log"

"github.com/santhosh-tekuri/jsonschema/v5"
)

var SCHEMA = `
{
"properties": {
"kind": {
"enum": [
"notice",
"log",
"ping"
]
},
"runnerID": {
"type": "string"
}
},
"allOf": [
{
"if": {
"properties": {
"kind": {
"const": "log"
}
}
},
"then": {
"properties": {
"lines": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"containerName": {
"type": "string"
},
"message": {
"type": "string"
}
}
}
}
},
"additionalProperties": true
}
},
{
"if": {
"properties": {
"kind": {
"const": "notice"
}
}
},
"then": {
"properties": {
"severity": {
"enum": [
"info",
"warning",
"error"
]
},
"message": {
"type": "string"
}
},
"additionalProperties": true
}
},
{
"if": {
"properties": {
"kind": {
"const": "ping"
}
}
},
"then": {
"properties": {
"message": {
"type": "string"
}
},
"additionalProperties": true
}
}
],
"additionalProperties": true
}
`

const (
NOTICE = "notice"
LOG = "log"
PING = "ping"
)

type AsyncEventI interface {
GetKind() string
GetRunnerID() string
}

type AsyncEvent struct {
Kind string `json:"kind"`
RunnerID string `json:"runnerID"`
}

func (a *AsyncEvent) GetKind() string {
return a.Kind
}

func (a *AsyncEvent) GetRunnerID() string {
return a.RunnerID
}

type LogLine struct {
Id string `json:"id"`
ContainerName string `json:"containerName"`
Message string `json:"message"`
}

type LogEvent struct {
AsyncEvent
Lines []LogLine `json:"lines"`
}

type PingEvent struct {
AsyncEvent
Message string `json:"message"`
}

type NoticeEvent struct {
AsyncEvent
Severity string `json:"severity"`
Message string `json:"message"`
}

type AsyncEventManagerI interface {
ParseEvent(event string) (AsyncEventI, error)
}

type AsyncEventManager struct {
schema *jsonschema.Schema
}

func NewAsyncEventManager() (*AsyncEventManager, error) {
schema, err := jsonschema.CompileString("schema.json", SCHEMA)
if err != nil {
return nil, err
}

asyncEventManager := AsyncEventManager{
schema: schema,
}

return &asyncEventManager, nil
}

func (a *AsyncEventManager) ParseEvent(event string) (AsyncEventI, error) {
err := a.schema.Validate(event)
if err != nil {
return nil, err
}
v := AsyncEvent{}
if err := json.Unmarshal([]byte(event), &v); err != nil {
log.Fatal(err)
}

if v.GetKind() == LOG {
logEvent := LogEvent{}
if err := json.Unmarshal([]byte(event), &logEvent); err != nil {
log.Fatal(err)
}
return &logEvent, nil
} else if v.GetKind() == NOTICE {
noticeEvent := NoticeEvent{}
if err := json.Unmarshal([]byte(event), &noticeEvent); err != nil {
log.Fatal(err)
}
return &noticeEvent, nil
} else if v.GetKind() == PING {
pingEvent := PingEvent{}
if err := json.Unmarshal([]byte(event), &pingEvent); err != nil {
log.Fatal(err)
}
return &pingEvent, nil
} else {
return nil, fmt.Errorf("unknown event type: %s", v.GetKind())
}
}
79 changes: 79 additions & 0 deletions internal/imagerunner/async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package imagerunner

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLogEvent(t *testing.T) {
manager, err := NewAsyncEventManager()
assert.NoError(t, err)

eventMsg := `{
"kind": "log",
"runnerID": "myrunner",
"lines": [
{
"id": "1",
"containerName": "mycontainer",
"message": "hello"
}
]
}`

event, err := manager.ParseEvent(eventMsg)
assert.NoError(t, err)
assert.Equal(t, "log", event.GetKind())
assert.Equal(t, "myrunner", event.GetRunnerID())

logEvent, ok := event.(*LogEvent)
assert.True(t, ok)
assert.Len(t, logEvent.Lines, 1)
assert.Equal(t, "1", logEvent.Lines[0].Id)
assert.Equal(t, "hello", logEvent.Lines[0].Message)
assert.Equal(t, "mycontainer", logEvent.Lines[0].ContainerName)
}

func TestNoticeEvent(t *testing.T) {
manager, err := NewAsyncEventManager()
assert.NoError(t, err)

eventMsg := `{
"kind": "notice",
"runnerID": "myrunner",
"severity": "info",
"message": "hello"
}`

event, err := manager.ParseEvent(eventMsg)
assert.NoError(t, err)
assert.Equal(t, "notice", event.GetKind())
assert.Equal(t, "myrunner", event.GetRunnerID())

noticeEvent, ok := event.(*NoticeEvent)
assert.True(t, ok)
assert.Equal(t, "notice", noticeEvent.GetKind())
assert.Equal(t, "hello", noticeEvent.Message)
assert.Equal(t, "info", noticeEvent.Severity)
}

func TestPingEvent(t *testing.T) {
manager, err := NewAsyncEventManager()
assert.NoError(t, err)

eventMsg := `{
"kind": "ping",
"runnerID": "myrunner",
"message": "hello"
}`

event, err := manager.ParseEvent(eventMsg)
assert.NoError(t, err)
assert.Equal(t, "ping", event.GetKind())
assert.Equal(t, "myrunner", event.GetRunnerID())

pingEvent, ok := event.(*PingEvent)
assert.True(t, ok)
assert.Equal(t, "hello", pingEvent.Message)
}
Loading

0 comments on commit d24fc55

Please sign in to comment.