Skip to content

Commit

Permalink
Refactor 'engine' portion of agent
Browse files Browse the repository at this point in the history
This solves many problems with task transitions (again!). It also
provides significantly more detailed error reasons to the backend.

Relates to aws#52, aws#31, and aws#24
  • Loading branch information
euank committed Apr 28, 2015
1 parent 5b1b10f commit 03364fd
Show file tree
Hide file tree
Showing 31 changed files with 1,108 additions and 890 deletions.
25 changes: 8 additions & 17 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func payloadMessageHandler(cs acsclient.ClientServer, cluster, containerInstance
// If there was an error converting these from acs to engine
// tasks, report to the backend that they're not running and
// give a suitable reason
badtaskChanges := make([]*api.ContainerStateChange, 0, len(task.Containers))
for _, container := range task.Containers {
if container == nil {
log.Error("Recieved task with nil containers", "arn", *task.Arn)
Expand All @@ -110,26 +109,18 @@ func payloadMessageHandler(cs acsclient.ClientServer, cluster, containerInstance
log.Error("Recieved task with nil container name", "arn", *task.Arn)
continue
}
change := api.ContainerStateChange{
eventhandler.AddContainerEvent(api.ContainerStateChange{
TaskArn: *task.Arn,
Status: api.ContainerStopped,
Reason: "Error loading task: " + err.Error(),
Container: &api.Container{},
Reason: "UnrecognizedACSTask: Error loading task: " + err.Error(),
ContainerName: *container.Name,
}
badtaskChanges = append(badtaskChanges, &change)
}
if len(badtaskChanges) == 0 {
return
}
// The last container stop also brings down the task
taskChange := badtaskChanges[len(badtaskChanges)-1]
taskChange.TaskStatus = api.TaskStopped
taskChange.Task = &api.Task{}

for _, change := range badtaskChanges {
eventhandler.AddTaskEvent(*change, client)
}, client)
}
eventhandler.AddTaskEvent(api.TaskStateChange{
TaskArn: *task.Arn,
Status: api.TaskStopped,
Reason: "UnrecognizedACSTask: Error loading task: " + err.Error(),
}, client)
return
}
// Else, no error converting, add to engine
Expand Down
2 changes: 1 addition & 1 deletion agent/acs/update_handler/os/mock/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package mock_os

import (
gomock "code.google.com/p/gomock/gomock"
io "io"
os "os"
gomock "code.google.com/p/gomock/gomock"
)

// Mock of FileSystem interface
Expand Down
11 changes: 4 additions & 7 deletions agent/api/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var log = logger.ForModule("api client")
type ECSClient interface {
CredentialProvider() credentials.AWSCredentialProvider
RegisterContainerInstance() (string, error)
SubmitTaskStateChange(change ContainerStateChange) utils.RetriableError
SubmitTaskStateChange(change TaskStateChange) utils.RetriableError
SubmitContainerStateChange(change ContainerStateChange) utils.RetriableError
DiscoverPollEndpoint(containerInstanceArn string) (string, error)
}
Expand Down Expand Up @@ -255,16 +255,13 @@ func (client *ApiECSClient) registerContainerInstance(clusterRef string) (string
return *resp.ContainerInstance().ContainerInstanceArn(), nil
}

func (client *ApiECSClient) SubmitTaskStateChange(change ContainerStateChange) utils.RetriableError {
if change.TaskStatus == TaskStatusNone {
func (client *ApiECSClient) SubmitTaskStateChange(change TaskStateChange) utils.RetriableError {
if change.Status == TaskStatusNone {
log.Warn("SubmitTaskStateChange called with an invalid change", "change", change)
return NewAPIError(errors.New("SubmitTaskStateChange called with an invalid change"))
}

stat := change.TaskStatus.String()
if stat == "DEAD" {
stat = "STOPPED"
}
stat := change.Status.String()
if stat != "STOPPED" && stat != "RUNNING" {
log.Debug("Not submitting unsupported upstream task state", "state", stat)
// Not really an error
Expand Down
8 changes: 8 additions & 0 deletions agent/api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,11 @@ func (apiErr *APIError) Retry() bool {
func (apiErr *APIError) Error() string {
return apiErr.err.Error()
}

type badVolumeError struct {
msg string
}

func (err *badVolumeError) Error() string { return err.msg }
func (err *badVolumeError) ErrorName() string { return "InvalidVolumeError" }
func (err *badVolumeError) Retry() bool { return false }
26 changes: 22 additions & 4 deletions agent/api/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,23 @@ func (ts *TaskStatus) UnmarshalJSON(b []byte) error {
return nil
}
if b[0] != '"' || b[len(b)-1] != '"' {
*ts = TaskStatusUnknown
*ts = TaskStatusNone
return errors.New("TaskStatus must be a string or null")
}
strStatus := string(b[1 : len(b)-1])
// 'UNKNOWN' and 'DEAD' for Compatibility with v1.0.0 state files
if strStatus == "UNKNOWN" {
*ts = TaskStatusNone
return nil
}
if strStatus == "DEAD" {
*ts = TaskStopped
return nil
}

stat, ok := taskStatusMap[strStatus]
if !ok {
*ts = TaskStatusUnknown
*ts = TaskStatusNone
return errors.New("Unrecognized TaskStatus")
}
*ts = stat
Expand All @@ -54,14 +63,23 @@ func (cs *ContainerStatus) UnmarshalJSON(b []byte) error {
return nil
}
if b[0] != '"' || b[len(b)-1] != '"' {
*cs = ContainerStatusUnknown
*cs = ContainerStatusNone
return errors.New("ContainerStatus must be a string or null; Got " + string(b))
}
strStatus := string(b[1 : len(b)-1])
// 'UNKNOWN' and 'DEAD' for Compatibility with v1.0.0 state files
if strStatus == "UNKNOWN" {
*cs = ContainerStatusNone
return nil
}
if strStatus == "DEAD" {
*cs = ContainerStopped
return nil
}

stat, ok := containerStatusMap[strStatus]
if !ok {
*cs = ContainerStatusUnknown
*cs = ContainerStatusNone
return errors.New("Unrecognized ContainerStatus")
}
*cs = stat
Expand Down
20 changes: 12 additions & 8 deletions agent/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ package api

var taskStatusMap = map[string]TaskStatus{
"NONE": TaskStatusNone,
"UNKNOWN": TaskStatusUnknown,
"CREATED": TaskCreated,
"RUNNING": TaskRunning,
"STOPPED": TaskStopped,
"DEAD": TaskStopped,
}

func (ts *TaskStatus) String() string {
Expand All @@ -28,7 +26,7 @@ func (ts *TaskStatus) String() string {
return k
}
}
return "UNKNOWN"
return "NONE"
}

// Mapping task status in the agent to that in the backend
Expand All @@ -42,14 +40,16 @@ func (ts *TaskStatus) BackendStatus() string {
return "PENDING"
}

func (ts *TaskStatus) BackendRecognized() bool {
return *ts == TaskRunning || *ts == TaskStopped
}

var containerStatusMap = map[string]ContainerStatus{
"NONE": ContainerStatusNone,
"UNKNOWN": ContainerStatusUnknown,
"PULLED": ContainerPulled,
"CREATED": ContainerCreated,
"RUNNING": ContainerRunning,
"STOPPED": ContainerStopped,
"DEAD": ContainerStopped,
}

func (cs *ContainerStatus) String() string {
Expand All @@ -58,7 +58,7 @@ func (cs *ContainerStatus) String() string {
return k
}
}
return "UNKNOWN"
return "NONE"
}

func (cs *ContainerStatus) TaskStatus() TaskStatus {
Expand All @@ -72,7 +72,7 @@ func (cs *ContainerStatus) TaskStatus() TaskStatus {
case ContainerStopped:
return TaskStopped
}
return TaskStatusUnknown
return TaskStatusNone
}

func (ts *TaskStatus) ContainerStatus() ContainerStatus {
Expand All @@ -86,7 +86,11 @@ func (ts *TaskStatus) ContainerStatus() ContainerStatus {
case TaskStopped:
return ContainerStopped
}
return ContainerStatusUnknown
return ContainerStatusNone
}

func (cs *ContainerStatus) BackendRecognized() bool {
return *cs == ContainerRunning || *cs == ContainerStopped
}

func (cs *ContainerStatus) Terminal() bool {
Expand Down
130 changes: 64 additions & 66 deletions agent/api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,87 +131,41 @@ func (task *Task) UpdateMountPoints(cont *Container, vols map[string]string) {
}
}

// InferContainerDesiredStatus ensures that all container's desired statuses are
// compatible with whatever status the task desires to be at or is at.
// This is used both to initialize container statuses of new tasks and to force
// auxilery containers into terminal states (e.g. the essential containers died
// already)
func (task *Task) InferContainerDesiredStatus() {
// UpdateContainerDesiredStatus sets all container's desired status's to the
// task's desired status
func (task *Task) UpdateContainerDesiredStatus() {
for _, c := range task.Containers {
c.DesiredStatus = task.maxStatus().ContainerStatus()
}
}

func (task *Task) maxStatus() *TaskStatus {
if task.KnownStatus > task.DesiredStatus {
return &task.KnownStatus
if c.DesiredStatus < task.DesiredStatus.ContainerStatus() {
c.DesiredStatus = task.DesiredStatus.ContainerStatus()
}
}
return &task.DesiredStatus
}

// UpdateTaskState updates the given task's status based on its container's status.
// For example, if an essential container stops, it will set the task to
// stopped.
// UpdateTaskKnownState updates the given task's status based on its container's status.
// It updates to the minimum of all containers no matter what
// It returns a TaskStatus indicating what change occured or TaskStatusNone if
// there was no change
func (task *Task) UpdateTaskStatus() (newStatus TaskStatus) {
//The task is the minimum status of all its essential containers unless the
//status is terminal in which case it's that status
log.Debug("Updating task", "task", task)
func (task *Task) UpdateTaskKnownStatus() (newStatus TaskStatus) {
llog := log.New("task", task)
llog.Debug("Updating task")
defer func() {
if newStatus != TaskStatusNone {
task.KnownTime = time.Now()
}
}()

// Set to a large 'impossible' status that can't be the min
essentialContainersEarliestStatus := ContainerZombie
allContainersEarliestStatus := ContainerZombie
earliestStatus := ContainerZombie
for _, cont := range task.Containers {
log.Debug("On container", "cont", cont)
if cont.KnownStatus < allContainersEarliestStatus {
allContainersEarliestStatus = cont.KnownStatus
}
if !cont.Essential {
continue
}

if cont.KnownStatus.Terminal() && !task.KnownStatus.Terminal() {
// Any essential & terminal container brings down the whole task
task.KnownStatus = TaskStopped
return task.KnownStatus
if cont.KnownStatus < earliestStatus {
earliestStatus = cont.KnownStatus
}
// Non-terminal
if cont.KnownStatus < essentialContainersEarliestStatus {
essentialContainersEarliestStatus = cont.KnownStatus
}
}

if essentialContainersEarliestStatus == ContainerZombie {
log.Warn("Task with no essential containers; all properly formed tasks should have at least one essential container", "task", task)

// If there are no essential containers, assume the container with the
// earliest status is essential and proceed.
essentialContainersEarliestStatus = allContainersEarliestStatus
}

log.Debug("Earliest essential status is " + essentialContainersEarliestStatus.String())

if essentialContainersEarliestStatus == ContainerCreated {
if task.KnownStatus < TaskCreated {
task.KnownStatus = TaskCreated
return task.KnownStatus
}
} else if essentialContainersEarliestStatus == ContainerRunning {
if task.KnownStatus < TaskRunning {
task.KnownStatus = TaskRunning
return task.KnownStatus
}
} else if essentialContainersEarliestStatus == ContainerStopped {
if task.KnownStatus < TaskStopped {
task.KnownStatus = TaskStopped
return task.KnownStatus
}
llog.Debug("Earliest status is " + earliestStatus.String())
if task.KnownStatus < earliestStatus.TaskStatus() {
task.KnownStatus = earliestStatus.TaskStatus()
return task.KnownStatus
}
return TaskStatusNone
}
Expand Down Expand Up @@ -290,15 +244,15 @@ func (task *Task) dockerConfigVolumes(container *Container) (map[string]struct{}
for _, m := range container.MountPoints {
vol, exists := task.HostVolumeByName(m.SourceVolume)
if !exists {
return nil, errors.New("Container references non-existent volume")
return nil, &badVolumeError{"Container " + container.Name + " in task " + task.Arn + " references invalid volume " + m.SourceVolume}
}
// you can handle most volume mount types in the HostConfig at run-time;
// empty mounts are created by docker at create-time (Config) so set
// them here.
if container.Name == emptyHostVolumeName && container.IsInternal {
_, ok := vol.(*EmptyHostVolume)
if !ok {
return nil, errors.New("invalid state; internal emptyvolume container with non empty volume")
return nil, &badVolumeError{"Empty volume container in task " + task.Arn + " was the wrong type"}
}

volumeMap[m.ContainerPath] = struct{}{}
Expand Down Expand Up @@ -435,3 +389,47 @@ func TaskFromACS(task *ecsacs.Task) (*Task, error) {
}
return outTask, nil
}

func (t *Task) SetDesiredStatus(status TaskStatus) {
llog := log.New("task", t, "status", status.String())

if status < t.DesiredStatus {
llog.Warn("Recieved event asking task to move backwards in desired; ignoring")
} else if status == t.DesiredStatus {
llog.Info("Redundant backend event; desired = desired")
} else {
llog.Debug("Updating task desired status")
t.DesiredStatus = status
t.UpdateContainerDesiredStatus()
}
}

// UpdateTaskDesiredStatus determines what status the task should properly be at based on its container's statuses
func (task *Task) UpdateTaskDesiredStatus() {
llog := log.New("task", task)
llog.Debug("Updating task")

// A task's desired status is stopped if any essential container is stopped
// Otherwise, the task's desired status is unchanged (typically running, but no need to change)
for _, cont := range task.Containers {
if cont.Essential && (cont.KnownStatus.Terminal() || cont.DesiredStatus.Terminal()) {
llog.Debug("Updating task desired status to stopped", "container", cont.Name)
task.DesiredStatus = TaskStopped
}
}
}

// UpdateState updates a task's known and desired statuses to be compatible
// with all of its containers
// It will return a bool indicating if there was a change
func (t *Task) UpdateStatus() bool {
change := t.UpdateTaskKnownStatus()
// DesiredStatus can change based on a new known status
t.UpdateDesiredStatus()
return change != TaskStatusNone
}

func (t *Task) UpdateDesiredStatus() {
t.UpdateTaskDesiredStatus()
t.UpdateContainerDesiredStatus()
}
Loading

0 comments on commit 03364fd

Please sign in to comment.