From ffb7b11ae0e39d616956633c3345ff29a7e9a4f6 Mon Sep 17 00:00:00 2001 From: Euan Date: Tue, 21 Apr 2015 20:54:13 +0000 Subject: [PATCH] Refactor 'engine' portion of agent This solves many problems with task transitions (again!). It also provides significantly more detailed error reasons to the backend. Relates to #52, #31, and #24 --- agent/acs/handler/acs_handler.go | 25 +- .../acs/update_handler/os/mock/filesystem.go | 2 +- agent/api/api_client.go | 11 +- agent/api/errors.go | 8 + agent/api/json.go | 26 +- agent/api/status.go | 20 +- agent/api/task.go | 130 ++-- agent/api/types.go | 20 +- agent/engine/docker_container_engine.go | 144 ++-- .../docker_container_engine_integ_test.go | 40 -- agent/engine/docker_task_engine.go | 669 ++++++++++-------- .../engine/dockerauth/ecs/ecs_credentials.go | 1 + .../dockerstate/docker_task_engine_state.go | 19 +- agent/engine/dockerstate/dockerstate_test.go | 6 +- agent/engine/dockerstate/json.go | 2 +- .../engine/dockerstate/testutils/json_test.go | 2 +- agent/engine/engine_integ_test.go | 481 +++++++------ agent/engine/errors.go | 1 + agent/engine/interface.go | 2 +- agent/engine/mocks/engine_mocks.go | 26 +- agent/engine/types.go | 14 +- agent/eventhandler/handler.go | 18 +- agent/eventhandler/handler_test.go | 155 ++-- agent/eventhandler/task_handler.go | 73 +- agent/eventhandler/task_handler_types.go | 53 +- agent/handlers/v1_handlers_test.go | 6 +- agent/statemanager/state_manager_test.go | 2 +- agent/utils/ttime/test_time.go | 11 + agent/utils/ttime/ttime.go | 11 + agent/utils/ttime/ttime_test.go | 19 + misc/volumes-test/Dockerfile | 1 - 31 files changed, 1108 insertions(+), 890 deletions(-) delete mode 100644 agent/engine/docker_container_engine_integ_test.go create mode 100644 agent/engine/errors.go diff --git a/agent/acs/handler/acs_handler.go b/agent/acs/handler/acs_handler.go index c56925b3b2c..a92862bd4fb 100644 --- a/agent/acs/handler/acs_handler.go +++ b/agent/acs/handler/acs_handler.go @@ -100,7 +100,6 @@ func payloadMessageHandler(cs acsclient.ClientServer, cluster, containerInstance // If there was an error converting these from acs to engine // tasks, report to the backend that they're not running and // give a suitable reason - badtaskChanges := make([]*api.ContainerStateChange, 0, len(task.Containers)) for _, container := range task.Containers { if container == nil { log.Error("Recieved task with nil containers", "arn", *task.Arn) @@ -110,26 +109,18 @@ func payloadMessageHandler(cs acsclient.ClientServer, cluster, containerInstance log.Error("Recieved task with nil container name", "arn", *task.Arn) continue } - change := api.ContainerStateChange{ + eventhandler.AddContainerEvent(api.ContainerStateChange{ TaskArn: *task.Arn, Status: api.ContainerStopped, - Reason: "Error loading task: " + err.Error(), - Container: &api.Container{}, + Reason: "UnrecognizedACSTask: Error loading task: " + err.Error(), ContainerName: *container.Name, - } - badtaskChanges = append(badtaskChanges, &change) - } - if len(badtaskChanges) == 0 { - return - } - // The last container stop also brings down the task - taskChange := badtaskChanges[len(badtaskChanges)-1] - taskChange.TaskStatus = api.TaskStopped - taskChange.Task = &api.Task{} - - for _, change := range badtaskChanges { - eventhandler.AddTaskEvent(*change, client) + }, client) } + eventhandler.AddTaskEvent(api.TaskStateChange{ + TaskArn: *task.Arn, + Status: api.TaskStopped, + Reason: "UnrecognizedACSTask: Error loading task: " + err.Error(), + }, client) return } // Else, no error converting, add to engine diff --git a/agent/acs/update_handler/os/mock/filesystem.go b/agent/acs/update_handler/os/mock/filesystem.go index 2bcca46df09..ca84987a21b 100644 --- a/agent/acs/update_handler/os/mock/filesystem.go +++ b/agent/acs/update_handler/os/mock/filesystem.go @@ -17,9 +17,9 @@ package mock_os import ( - gomock "code.google.com/p/gomock/gomock" io "io" os "os" + gomock "code.google.com/p/gomock/gomock" ) // Mock of FileSystem interface diff --git a/agent/api/api_client.go b/agent/api/api_client.go index 712b9b3f18f..b531d6ac6d9 100644 --- a/agent/api/api_client.go +++ b/agent/api/api_client.go @@ -43,7 +43,7 @@ type ECSClient interface { RegisterContainerInstance() (string, error) // SubmitTaskStateChange sends a state change and returns an error // indicating if it was submitted - SubmitTaskStateChange(change ContainerStateChange) utils.RetriableError + SubmitTaskStateChange(change TaskStateChange) utils.RetriableError // SubmitContainerStateChange sends a state change and returns an error // indicating if it was submitted SubmitContainerStateChange(change ContainerStateChange) utils.RetriableError @@ -219,16 +219,13 @@ func (client *ApiECSClient) registerContainerInstance(clusterRef string) (string return *resp.ContainerInstance.ContainerInstanceARN, nil } -func (client *ApiECSClient) SubmitTaskStateChange(change ContainerStateChange) utils.RetriableError { - if change.TaskStatus == TaskStatusNone { +func (client *ApiECSClient) SubmitTaskStateChange(change TaskStateChange) utils.RetriableError { + if change.Status == TaskStatusNone { log.Warn("SubmitTaskStateChange called with an invalid change", "change", change) return NewAPIError(errors.New("SubmitTaskStateChange called with an invalid change")) } - stat := change.TaskStatus.String() - if stat == "DEAD" { - stat = "STOPPED" - } + stat := change.Status.String() if stat != "STOPPED" && stat != "RUNNING" { log.Debug("Not submitting unsupported upstream task state", "state", stat) // Not really an error diff --git a/agent/api/errors.go b/agent/api/errors.go index 98bab9fbb4f..e0592ba0981 100644 --- a/agent/api/errors.go +++ b/agent/api/errors.go @@ -43,3 +43,11 @@ func (apiErr *APIError) Retry() bool { func (apiErr *APIError) Error() string { return apiErr.err.Error() } + +type badVolumeError struct { + msg string +} + +func (err *badVolumeError) Error() string { return err.msg } +func (err *badVolumeError) ErrorName() string { return "InvalidVolumeError" } +func (err *badVolumeError) Retry() bool { return false } diff --git a/agent/api/json.go b/agent/api/json.go index 8389f8636a6..9553164894b 100644 --- a/agent/api/json.go +++ b/agent/api/json.go @@ -27,14 +27,23 @@ func (ts *TaskStatus) UnmarshalJSON(b []byte) error { return nil } if b[0] != '"' || b[len(b)-1] != '"' { - *ts = TaskStatusUnknown + *ts = TaskStatusNone return errors.New("TaskStatus must be a string or null") } strStatus := string(b[1 : len(b)-1]) + // 'UNKNOWN' and 'DEAD' for Compatibility with v1.0.0 state files + if strStatus == "UNKNOWN" { + *ts = TaskStatusNone + return nil + } + if strStatus == "DEAD" { + *ts = TaskStopped + return nil + } stat, ok := taskStatusMap[strStatus] if !ok { - *ts = TaskStatusUnknown + *ts = TaskStatusNone return errors.New("Unrecognized TaskStatus") } *ts = stat @@ -54,14 +63,23 @@ func (cs *ContainerStatus) UnmarshalJSON(b []byte) error { return nil } if b[0] != '"' || b[len(b)-1] != '"' { - *cs = ContainerStatusUnknown + *cs = ContainerStatusNone return errors.New("ContainerStatus must be a string or null; Got " + string(b)) } strStatus := string(b[1 : len(b)-1]) + // 'UNKNOWN' and 'DEAD' for Compatibility with v1.0.0 state files + if strStatus == "UNKNOWN" { + *cs = ContainerStatusNone + return nil + } + if strStatus == "DEAD" { + *cs = ContainerStopped + return nil + } stat, ok := containerStatusMap[strStatus] if !ok { - *cs = ContainerStatusUnknown + *cs = ContainerStatusNone return errors.New("Unrecognized ContainerStatus") } *cs = stat diff --git a/agent/api/status.go b/agent/api/status.go index 354eaa689df..8fe9b274713 100644 --- a/agent/api/status.go +++ b/agent/api/status.go @@ -15,11 +15,9 @@ package api var taskStatusMap = map[string]TaskStatus{ "NONE": TaskStatusNone, - "UNKNOWN": TaskStatusUnknown, "CREATED": TaskCreated, "RUNNING": TaskRunning, "STOPPED": TaskStopped, - "DEAD": TaskStopped, } func (ts *TaskStatus) String() string { @@ -28,7 +26,7 @@ func (ts *TaskStatus) String() string { return k } } - return "UNKNOWN" + return "NONE" } // Mapping task status in the agent to that in the backend @@ -42,14 +40,16 @@ func (ts *TaskStatus) BackendStatus() string { return "PENDING" } +func (ts *TaskStatus) BackendRecognized() bool { + return *ts == TaskRunning || *ts == TaskStopped +} + var containerStatusMap = map[string]ContainerStatus{ "NONE": ContainerStatusNone, - "UNKNOWN": ContainerStatusUnknown, "PULLED": ContainerPulled, "CREATED": ContainerCreated, "RUNNING": ContainerRunning, "STOPPED": ContainerStopped, - "DEAD": ContainerStopped, } func (cs *ContainerStatus) String() string { @@ -58,7 +58,7 @@ func (cs *ContainerStatus) String() string { return k } } - return "UNKNOWN" + return "NONE" } func (cs *ContainerStatus) TaskStatus() TaskStatus { @@ -72,7 +72,7 @@ func (cs *ContainerStatus) TaskStatus() TaskStatus { case ContainerStopped: return TaskStopped } - return TaskStatusUnknown + return TaskStatusNone } func (ts *TaskStatus) ContainerStatus() ContainerStatus { @@ -86,7 +86,11 @@ func (ts *TaskStatus) ContainerStatus() ContainerStatus { case TaskStopped: return ContainerStopped } - return ContainerStatusUnknown + return ContainerStatusNone +} + +func (cs *ContainerStatus) BackendRecognized() bool { + return *cs == ContainerRunning || *cs == ContainerStopped } func (cs *ContainerStatus) Terminal() bool { diff --git a/agent/api/task.go b/agent/api/task.go index f17284c76f8..327ba9014aa 100644 --- a/agent/api/task.go +++ b/agent/api/task.go @@ -131,33 +131,23 @@ func (task *Task) UpdateMountPoints(cont *Container, vols map[string]string) { } } -// InferContainerDesiredStatus ensures that all container's desired statuses are -// compatible with whatever status the task desires to be at or is at. -// This is used both to initialize container statuses of new tasks and to force -// auxilery containers into terminal states (e.g. the essential containers died -// already) -func (task *Task) InferContainerDesiredStatus() { +// UpdateContainerDesiredStatus sets all container's desired status's to the +// task's desired status +func (task *Task) UpdateContainerDesiredStatus() { for _, c := range task.Containers { - c.DesiredStatus = task.maxStatus().ContainerStatus() - } -} - -func (task *Task) maxStatus() *TaskStatus { - if task.KnownStatus > task.DesiredStatus { - return &task.KnownStatus + if c.DesiredStatus < task.DesiredStatus.ContainerStatus() { + c.DesiredStatus = task.DesiredStatus.ContainerStatus() + } } - return &task.DesiredStatus } -// UpdateTaskState updates the given task's status based on its container's status. -// For example, if an essential container stops, it will set the task to -// stopped. +// UpdateTaskKnownState updates the given task's status based on its container's status. +// It updates to the minimum of all containers no matter what // It returns a TaskStatus indicating what change occured or TaskStatusNone if // there was no change -func (task *Task) UpdateTaskStatus() (newStatus TaskStatus) { - //The task is the minimum status of all its essential containers unless the - //status is terminal in which case it's that status - log.Debug("Updating task", "task", task) +func (task *Task) UpdateTaskKnownStatus() (newStatus TaskStatus) { + llog := log.New("task", task) + llog.Debug("Updating task") defer func() { if newStatus != TaskStatusNone { task.KnownTime = time.Now() @@ -165,53 +155,17 @@ func (task *Task) UpdateTaskStatus() (newStatus TaskStatus) { }() // Set to a large 'impossible' status that can't be the min - essentialContainersEarliestStatus := ContainerZombie - allContainersEarliestStatus := ContainerZombie + earliestStatus := ContainerZombie for _, cont := range task.Containers { - log.Debug("On container", "cont", cont) - if cont.KnownStatus < allContainersEarliestStatus { - allContainersEarliestStatus = cont.KnownStatus - } - if !cont.Essential { - continue - } - - if cont.KnownStatus.Terminal() && !task.KnownStatus.Terminal() { - // Any essential & terminal container brings down the whole task - task.KnownStatus = TaskStopped - return task.KnownStatus + if cont.KnownStatus < earliestStatus { + earliestStatus = cont.KnownStatus } - // Non-terminal - if cont.KnownStatus < essentialContainersEarliestStatus { - essentialContainersEarliestStatus = cont.KnownStatus - } - } - - if essentialContainersEarliestStatus == ContainerZombie { - log.Warn("Task with no essential containers; all properly formed tasks should have at least one essential container", "task", task) - - // If there are no essential containers, assume the container with the - // earliest status is essential and proceed. - essentialContainersEarliestStatus = allContainersEarliestStatus } - log.Debug("Earliest essential status is " + essentialContainersEarliestStatus.String()) - - if essentialContainersEarliestStatus == ContainerCreated { - if task.KnownStatus < TaskCreated { - task.KnownStatus = TaskCreated - return task.KnownStatus - } - } else if essentialContainersEarliestStatus == ContainerRunning { - if task.KnownStatus < TaskRunning { - task.KnownStatus = TaskRunning - return task.KnownStatus - } - } else if essentialContainersEarliestStatus == ContainerStopped { - if task.KnownStatus < TaskStopped { - task.KnownStatus = TaskStopped - return task.KnownStatus - } + llog.Debug("Earliest status is " + earliestStatus.String()) + if task.KnownStatus < earliestStatus.TaskStatus() { + task.KnownStatus = earliestStatus.TaskStatus() + return task.KnownStatus } return TaskStatusNone } @@ -290,7 +244,7 @@ func (task *Task) dockerConfigVolumes(container *Container) (map[string]struct{} for _, m := range container.MountPoints { vol, exists := task.HostVolumeByName(m.SourceVolume) if !exists { - return nil, errors.New("Container references non-existent volume") + return nil, &badVolumeError{"Container " + container.Name + " in task " + task.Arn + " references invalid volume " + m.SourceVolume} } // you can handle most volume mount types in the HostConfig at run-time; // empty mounts are created by docker at create-time (Config) so set @@ -298,7 +252,7 @@ func (task *Task) dockerConfigVolumes(container *Container) (map[string]struct{} if container.Name == emptyHostVolumeName && container.IsInternal { _, ok := vol.(*EmptyHostVolume) if !ok { - return nil, errors.New("invalid state; internal emptyvolume container with non empty volume") + return nil, &badVolumeError{"Empty volume container in task " + task.Arn + " was the wrong type"} } volumeMap[m.ContainerPath] = struct{}{} @@ -435,3 +389,47 @@ func TaskFromACS(task *ecsacs.Task) (*Task, error) { } return outTask, nil } + +func (t *Task) SetDesiredStatus(status TaskStatus) { + llog := log.New("task", t, "status", status.String()) + + if status < t.DesiredStatus { + llog.Warn("Recieved event asking task to move backwards in desired; ignoring") + } else if status == t.DesiredStatus { + llog.Info("Redundant backend event; desired = desired") + } else { + llog.Debug("Updating task desired status") + t.DesiredStatus = status + t.UpdateContainerDesiredStatus() + } +} + +// UpdateTaskDesiredStatus determines what status the task should properly be at based on its container's statuses +func (task *Task) UpdateTaskDesiredStatus() { + llog := log.New("task", task) + llog.Debug("Updating task") + + // A task's desired status is stopped if any essential container is stopped + // Otherwise, the task's desired status is unchanged (typically running, but no need to change) + for _, cont := range task.Containers { + if cont.Essential && (cont.KnownStatus.Terminal() || cont.DesiredStatus.Terminal()) { + llog.Debug("Updating task desired status to stopped", "container", cont.Name) + task.DesiredStatus = TaskStopped + } + } +} + +// UpdateState updates a task's known and desired statuses to be compatible +// with all of its containers +// It will return a bool indicating if there was a change +func (t *Task) UpdateStatus() bool { + change := t.UpdateTaskKnownStatus() + // DesiredStatus can change based on a new known status + t.UpdateDesiredStatus() + return change != TaskStatusNone +} + +func (t *Task) UpdateDesiredStatus() { + t.UpdateTaskDesiredStatus() + t.UpdateContainerDesiredStatus() +} diff --git a/agent/api/types.go b/agent/api/types.go index b2b84bee4f0..4549810cf10 100644 --- a/agent/api/types.go +++ b/agent/api/types.go @@ -25,7 +25,6 @@ type ContainerStatus int32 const ( TaskStatusNone TaskStatus = iota - TaskStatusUnknown TaskPulled TaskCreated TaskRunning @@ -34,7 +33,6 @@ const ( const ( ContainerStatusNone ContainerStatus = iota - ContainerStatusUnknown ContainerPulled ContainerCreated ContainerRunning @@ -118,10 +116,22 @@ type ContainerStateChange struct { ExitCode *int PortBindings []PortBinding - TaskStatus TaskStatus // TaskStatusNone if this does not result in a task state change + // This bit is a little hacky; a pointer to the container's sentstatus which + // may be updated to indicate what status was sent. This is used to ensure + // the same event is handled only once. + SentStatus *ContainerStatus +} - Task *Task - Container *Container +type TaskStateChange struct { + TaskArn string + Status TaskStatus + Reason string + + // As above, this is the same sort of hacky. + // This is a pointer to the task's sent-status that gives the event handler a + // hook into storing metadata about the task on the task such that it follows + // the lifecycle of the task and so on. + SentStatus *TaskStatus } func (t *Task) String() string { diff --git a/agent/engine/docker_container_engine.go b/agent/engine/docker_container_engine.go index a71ded6cbdc..f05d2740d1a 100644 --- a/agent/engine/docker_container_engine.go +++ b/agent/engine/docker_container_engine.go @@ -33,13 +33,14 @@ import ( type DockerClient interface { ContainerEvents() (<-chan DockerContainerChangeEvent, error) - PullImage(image string) error - CreateContainer(*docker.Config, string) (string, error) - StartContainer(string, *docker.HostConfig) error - StopContainer(string) error + PullImage(image string) DockerContainerMetadata + CreateContainer(*docker.Config, string) DockerContainerMetadata + StartContainer(string, *docker.HostConfig) DockerContainerMetadata + StopContainer(string) DockerContainerMetadata + RemoveContainer(string) error - GetContainerName(string) (string, error) + GetContainerName(string) (string, error) InspectContainer(string) (*docker.Container, error) DescribeContainer(string) (api.ContainerStatus, error) @@ -78,17 +79,18 @@ func NewDockerGoClient() (*DockerGoClient, error) { return dg, err } -func (dg *DockerGoClient) PullImage(image string) error { - log.Info("Pulling image", "image", image) +func (dg *DockerGoClient) PullImage(image string) DockerContainerMetadata { + log.Debug("Pulling image", "image", image) client, err := dg.client() if err != nil { - return err + return DockerContainerMetadata{Error: err} } // Special case; this image is not one that should be pulled, but rather // should be created locally if necessary if image == emptyvolume.Image+":"+emptyvolume.Tag { - return dg.createScratchImageIfNotExists() + err = dg.createScratchImageIfNotExists() + return DockerContainerMetadata{Error: err} } authConfig := dockerauth.GetAuthconfig(image) @@ -98,6 +100,7 @@ func (dg *DockerGoClient) PullImage(image string) error { defer pullLock.Unlock() pullDebugOut, pullWriter := io.Pipe() + defer pullWriter.Close() opts := docker.PullImageOptions{ Repository: image, OutputStream: pullWriter, @@ -111,13 +114,13 @@ func (dg *DockerGoClient) PullImage(image string) error { log.Debug("Pulling image", "image", image, "status", string(line[:])) line, _, err = reader.ReadLine() } - if err != nil { - log.Error("Error reading pull image status", "image", image, "err", err) + if err != nil && err != io.EOF { + log.Warn("Error reading pull image status", "image", image, "err", err) } }() err = client.PullImage(opts, authConfig) - return err + return DockerContainerMetadata{Error: err} } func (dg *DockerGoClient) createScratchImageIfNotExists() error { @@ -153,40 +156,33 @@ func (dg *DockerGoClient) createScratchImageIfNotExists() error { return err } -func (dg *DockerGoClient) CreateContainer(config *docker.Config, name string) (string, error) { +func (dg *DockerGoClient) CreateContainer(config *docker.Config, name string) DockerContainerMetadata { client, err := dg.client() if err != nil { - return "", err - } - - // Ensure this image was pulled so this can be a quick operation (taskEngine - // is blocked on this) - _, err = client.InspectImage(config.Image) - if err != nil { - return "", err + return DockerContainerMetadata{Error: err} } containerOptions := docker.CreateContainerOptions{Config: config, Name: name} dockerContainer, err := client.CreateContainer(containerOptions) - if err != nil { - return "", err + return DockerContainerMetadata{Error: err} } - return dockerContainer.ID, nil + return dg.containerMetadata(dockerContainer.ID) } -func (dg *DockerGoClient) StartContainer(id string, hostConfig *docker.HostConfig) error { +func (dg *DockerGoClient) StartContainer(id string, hostConfig *docker.HostConfig) DockerContainerMetadata { client, err := dg.client() if err != nil { - return err + return DockerContainerMetadata{Error: err, DockerId: id} } err = client.StartContainer(id, hostConfig) + metadata := dg.containerMetadata(id) if err != nil { - return err + metadata.Error = err } - return nil + return metadata } func dockerStateToState(state docker.State) api.ContainerStatus { @@ -199,16 +195,16 @@ func dockerStateToState(state docker.State) api.ContainerStatus { func (dg *DockerGoClient) DescribeContainer(dockerId string) (api.ContainerStatus, error) { client, err := dg.client() if err != nil { - return api.ContainerStatusUnknown, err + return api.ContainerStatusNone, err } if len(dockerId) == 0 { - return api.ContainerStatusUnknown, errors.New("Invalid container id: ''") + return api.ContainerStatusNone, errors.New("Invalid container id: ''") } dockerContainer, err := client.InspectContainer(dockerId) if err != nil { - return api.ContainerStatusUnknown, err + return api.ContainerStatusNone, err } return dockerStateToState(dockerContainer.State), nil } @@ -221,12 +217,24 @@ func (dg *DockerGoClient) InspectContainer(dockerId string) (*docker.Container, return client.InspectContainer(dockerId) } -func (dg *DockerGoClient) StopContainer(dockerId string) error { +func (dg *DockerGoClient) StopContainer(dockerId string) DockerContainerMetadata { client, err := dg.client() if err != nil { - return err + metadata := dg.containerMetadata(dockerId) + if metadata.Error == nil { + metadata.Error = err + } + return metadata } - return client.StopContainer(dockerId, DEFAULT_TIMEOUT_SECONDS) + err = client.StopContainer(dockerId, DEFAULT_TIMEOUT_SECONDS) + metadata := dg.containerMetadata(dockerId) + if err != nil { + log.Debug("Error stopping container", "err", err, "id", dockerId) + if metadata.Error == nil { + metadata.Error = err + } + } + return metadata } func (dg *DockerGoClient) RemoveContainer(dockerId string) error { @@ -237,12 +245,16 @@ func (dg *DockerGoClient) RemoveContainer(dockerId string) error { return client.RemoveContainer(docker.RemoveContainerOptions{ID: dockerId, RemoveVolumes: true, Force: false}) } -func (dg *DockerGoClient) StopContainerById(id string) error { +func (dg *DockerGoClient) StopContainerById(id string) DockerContainerMetadata { client, err := dg.client() if err != nil { - return err + return DockerContainerMetadata{Error: err} + } + err = client.StopContainer(id, DEFAULT_TIMEOUT_SECONDS) + if err != nil { + return DockerContainerMetadata{Error: err} } - return client.StopContainer(id, DEFAULT_TIMEOUT_SECONDS) + return dg.containerMetadata(id) } func (dg *DockerGoClient) GetContainerName(id string) (string, error) { @@ -267,7 +279,7 @@ func (dg *DockerGoClient) client() (*docker.Client, error) { // Re-read the env in case they corrected it endpoint := utils.DefaultIfBlank(os.Getenv(DOCKER_ENDPOINT_ENV_VARIABLE), DOCKER_DEFAULT_ENDPOINT) - client, err := docker.NewVersionedClient(endpoint, "1.15") + client, err := docker.NewVersionedClient(endpoint, "1.17") if err != nil { log.Error("Unable to conect to docker client. Ensure daemon is running", "endpoint", endpoint, "err", err) return nil, err @@ -277,6 +289,40 @@ func (dg *DockerGoClient) client() (*docker.Client, error) { return dockerclient, err } +func (dg *DockerGoClient) containerMetadata(id string) DockerContainerMetadata { + dockerContainer, err := dg.InspectContainer(id) + if err != nil { + return DockerContainerMetadata{Error: err} + } + var bindings []api.PortBinding + if dockerContainer.NetworkSettings != nil { + // Convert port bindings into the format our container expects + bindings, err = api.PortBindingFromDockerPortBinding(dockerContainer.NetworkSettings.Ports) + if err != nil { + log.Crit("Docker had network bindings we couldn't understand", "err", err) + return DockerContainerMetadata{Error: err} + } + } + metadata := DockerContainerMetadata{ + DockerId: id, + PortBindings: bindings, + Volumes: dockerContainer.Volumes, + } + if dockerContainer.State.Running == false { + metadata.ExitCode = &dockerContainer.State.ExitCode + } + if dockerContainer.State.Error != "" { + // TODO type this so that it shows up as 'DockerError: ' + metadata.Error = errors.New(dockerContainer.State.Error) + } + if dockerContainer.State.OOMKilled { + // TODO type this so it shows up as 'OutOfMemoryError: ...' + metadata.Error = errors.New("Memory limit exceeded; container killed") + } + + return metadata +} + // Listen to the docker event stream for container changes and pass them up func (dg *DockerGoClient) ContainerEvents() (<-chan DockerContainerChangeEvent, error) { client, err := dg.client() @@ -297,9 +343,11 @@ func (dg *DockerGoClient) ContainerEvents() (<-chan DockerContainerChangeEvent, go func() { for event := range events { - log.Debug("Got event from docker daemon", "event", event) containerId := event.ID - image := event.From + if containerId == "" { + continue + } + log.Debug("Got event from docker daemon", "event", event) var status api.ContainerStatus switch event.Status { @@ -311,6 +359,8 @@ func (dg *DockerGoClient) ContainerEvents() (<-chan DockerContainerChangeEvent, fallthrough case "die": fallthrough + case "oom": + fallthrough case "kill": status = api.ContainerStopped case "destroy": @@ -320,12 +370,22 @@ func (dg *DockerGoClient) ContainerEvents() (<-chan DockerContainerChangeEvent, // Image events case "pull": + fallthrough case "untag": + fallthrough case "delete": + // No interest in image events + continue default: - log.Warn("Unknown status event! Maybe docker updated? ", "status", event.Status) + log.Info("Unknown status event! Maybe docker updated? ", "status", event.Status) + } + + metadata := dg.containerMetadata(containerId) + + changedContainers <- DockerContainerChangeEvent{ + Status: status, + DockerContainerMetadata: metadata, } - changedContainers <- DockerContainerChangeEvent{DockerId: containerId, Image: image, Status: status} } }() diff --git a/agent/engine/docker_container_engine_integ_test.go b/agent/engine/docker_container_engine_integ_test.go deleted file mode 100644 index fd3db5eb335..00000000000 --- a/agent/engine/docker_container_engine_integ_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package engine - -import ( - "os" - "testing" -) - -func TestPullLibraryImage(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } - - dgc, err := NewDockerGoClient() - if err != nil { - t.Errorf("Unable to create client: %v", err) - } - - for _, image := range []string{"busybox", "library/busybox:latest", "busybox:latest"} { - err = dgc.PullImage(image) - if err != nil { - t.Errorf("Error pulling library image: %v", err) - } - } -} diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 27c44496fa9..c1b90b77048 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -28,7 +28,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/statemanager" "github.com/aws/amazon-ecs-agent/agent/utils" "github.com/aws/amazon-ecs-agent/agent/utils/ttime" - "github.com/fsouza/go-dockerclient" ) const ( @@ -39,28 +38,59 @@ const ( ) const ( - sweepInterval = 5 * time.Minute - taskStoppedDuration = 3 * time.Hour + taskStoppedDuration = 3 * time.Hour + steadyStateTaskVerifyInterval = 5 * time.Minute ) +type acsTaskUpdate struct { + api.TaskStatus +} + +type dockerContainerChange struct { + container *api.Container + event DockerContainerChangeEvent +} + +type managedTask struct { + *api.Task + + acsMessages chan api.TaskStatus + dockerMessages chan dockerContainerChange +} + +func (engine *DockerTaskEngine) newManagedTask(task *api.Task) *managedTask { + t := &managedTask{ + Task: task, + acsMessages: make(chan api.TaskStatus), + dockerMessages: make(chan dockerContainerChange), + } + engine.managedTasks[task.Arn] = t + return t +} + // The DockerTaskEngine interacts with docker to implement a task // engine type DockerTaskEngine struct { // implements TaskEngine - state *dockerstate.DockerTaskEngineState + // state stores all tasks this task engine is aware of, including their + // current state and mappings to/from dockerId and name. + // This is used to checkpoint state to disk so tasks may survive agent + // failures or updates + state *dockerstate.DockerTaskEngineState + managedTasks map[string]*managedTask - events <-chan DockerContainerChangeEvent - container_events chan api.ContainerStateChange - saver statemanager.Saver + events <-chan DockerContainerChangeEvent + containerEvents chan api.ContainerStateChange + taskEvents chan api.TaskStateChange + saver statemanager.Saver client DockerClient - // The processTasks mutex can be used to wait for all tasks to stop - // transitioning before doing a final state save + exit. When write-locked - // new tasks will not be processed. Anything transitioning a tasks state - // should aquire a read-lock. - processTasks sync.RWMutex + // processTasks is a mutex that the task engine must aquire before changing + // any task's state which it manages. Since this is a lock that encompasses + // all tasks, it must not aquire it for any significant duration + processTasks sync.Mutex } // NewDockerTaskEngine returns a created, but uninitialized, DockerTaskEngine. @@ -72,9 +102,11 @@ func NewDockerTaskEngine(cfg *config.Config) *DockerTaskEngine { client: nil, saver: statemanager.NewNoopStateManager(), - state: dockerstate.NewDockerTaskEngineState(), + state: dockerstate.NewDockerTaskEngineState(), + managedTasks: make(map[string]*managedTask), - container_events: make(chan api.ContainerStateChange), + containerEvents: make(chan api.ContainerStateChange), + taskEvents: make(chan api.TaskStateChange), } dockerauth.SetConfig(cfg) @@ -111,8 +143,6 @@ func (engine *DockerTaskEngine) Init() error { // Now catch up and start processing new events per normal go engine.handleDockerEvents() - go engine.sweepTasks() - return nil } @@ -165,9 +195,8 @@ func (engine *DockerTaskEngine) synchronizeState() { continue } for _, cont := range conts { - var reason string if cont.DockerId == "" { - log.Debug("Found container created while we were down", "name", cont.DockerName) + log.Debug("Found container potentially created while we were down", "name", cont.DockerName) // Figure out the dockerid describedCont, err := engine.client.InspectContainer(cont.DockerName) if err != nil { @@ -183,45 +212,21 @@ func (engine *DockerTaskEngine) synchronizeState() { if err != nil { currentState = api.ContainerStopped if !cont.Container.KnownTerminal() { + // TODO error type + cont.Container.ApplyingError = api.NewApplyingError(errors.New("Docker did not recognize container id after an ECS Agent restart")) log.Warn("Could not describe previously known container; assuming dead", "err", err) - reason = "Docker did not recognize container id after an ECS Agent restart." } } if currentState > cont.Container.KnownStatus { cont.Container.KnownStatus = currentState } } - // Over-aggressively resend everything. The task handler will - // discard items that have already been sent. - // We cannot actually emit an event yet because nothing is handling - // events; just throw it in a goroutine so this doesn't block - // forever. - go engine.emitEvent(task, cont, reason) } + engine.startTask(task) } engine.saver.Save() } -// sweepTasks periodically sweeps through all tasks looking for tasks that have -// been in the 'stopped' state for a sufficiently long time. At that time it -// deletes them and removes them from its "state". -func (engine *DockerTaskEngine) sweepTasks() { - for { - tasks := engine.state.AllTasks() - - for _, task := range tasks { - if task.KnownStatus.Terminal() { - if ttime.Since(task.KnownTime) > taskStoppedDuration { - engine.sweepTask(task) - engine.state.RemoveTask(task) - } - } - } - - ttime.Sleep(sweepInterval) - } -} - // sweepTask deletes all the containers associated with a task func (engine *DockerTaskEngine) sweepTask(task *api.Task) { for _, cont := range task.Containers { @@ -232,21 +237,37 @@ func (engine *DockerTaskEngine) sweepTask(task *api.Task) { } } -// emitEvent passes a given event up through the container_event channel. -// It also will update the task's knownStatus to match the container's -// knownStatus -func (engine *DockerTaskEngine) emitEvent(task *api.Task, container *api.DockerContainer, reason string) { - err := engine.updateContainerMetadata(task, container) - - // Every time something changes, make sure the state for the thing that - // changed is known about and move forwards if this change allows us to - defer engine.applyTaskState(task) +func (engine *DockerTaskEngine) emitTaskEevnt(task *api.Task, reason string) { + if !task.KnownStatus.BackendRecognized() { + return + } + if task.SentStatus >= task.KnownStatus { + log.Debug("Already sent task event; no need to re-send", "task", task.Arn, "event", task.KnownStatus.String()) + return + } + event := api.TaskStateChange{ + TaskArn: task.Arn, + Status: task.KnownStatus, + Reason: reason, + SentStatus: &task.SentStatus, + } + log.Info("Task change event", "event", event) + engine.taskEvents <- event +} - // Collect additional info we need for our StateChanges - if err != nil { - log.Crit("Error updating container metadata", "err", err) +// emitContainerEvent passes a given event up through the containerEvents channel if necessary. +// It will omit events the backend would not process and will perform best-effort deduplication of events. +func (engine *DockerTaskEngine) emitContainerEvent(task *api.Task, cont *api.Container, reason string) { + if !cont.KnownStatus.BackendRecognized() { + return + } + if cont.IsInternal { + return + } + if cont.SentStatus >= cont.KnownStatus { + log.Debug("Already sent container event; no need to re-send", "task", task.Arn, "container", cont.Name, "event", cont.KnownStatus.String()) + return } - cont := container.Container if reason == "" && cont.ApplyingError != nil { reason = cont.ApplyingError.Error() @@ -258,18 +279,11 @@ func (engine *DockerTaskEngine) emitEvent(task *api.Task, container *api.DockerC ExitCode: cont.KnownExitCode, PortBindings: cont.KnownPortBindings, Reason: reason, - Task: task, - Container: cont, + SentStatus: &cont.SentStatus, } - if task_change := task.UpdateTaskStatus(); task_change != api.TaskStatusNone { - log.Info("Task change event", "state", task_change) - event.TaskStatus = task_change - } - log.Info("Container change event", "event", event) - if cont.IsInternal { - return - } - engine.container_events <- event + log.Debug("Container change event", "event", event) + engine.containerEvents <- event + log.Debug("Container change event passed on", "event", event) } // openEventstream opens, but does not consume, the docker event stream @@ -286,7 +300,7 @@ func (engine *DockerTaskEngine) openEventstream() error { // event that it reads from the docker eventstream func (engine *DockerTaskEngine) handleDockerEvents() { for event := range engine.events { - log.Info("Handling an event", "event", event) + log.Debug("Handling a docker event", "event", event) task, task_found := engine.state.TaskById(event.DockerId) cont, container_found := engine.state.ContainerById(event.DockerId) @@ -294,311 +308,313 @@ func (engine *DockerTaskEngine) handleDockerEvents() { log.Debug("Event for container not managed", "dockerId", event.DockerId) continue } - // Update the status to what we now know to be the true status - if cont.Container.KnownStatus < event.Status { - cont.Container.KnownStatus = event.Status - engine.emitEvent(task, cont, "") - } else if cont.Container.KnownStatus == event.Status { - log.Warn("Redundant docker event; unusual but not critical", "event", event, "cont", cont) - } else { - if !cont.Container.KnownTerminal() { - log.Crit("Docker container went backwards in state! This container will no longer be managed", "cont", cont, "event", event) - } + engine.processTasks.Lock() + managedTask, ok := engine.managedTasks[task.Arn] + engine.processTasks.Unlock() + if !ok { + log.Crit("Could not find managed task corresponding to a docker event", "event", event, "task", task) } + managedTask.dockerMessages <- dockerContainerChange{container: cont.Container, event: event} } log.Crit("Docker event stream closed unexpectedly") } -// updateContainerMetadata updates a minor set of metadata about a container -// that cannot be fully determined beforehand. Specifically, it will determine -// the exit code when a container stops, and the portBindings when it is started -// (and thus they are fully resolved). -func (engine *DockerTaskEngine) updateContainerMetadata(task *api.Task, container *api.DockerContainer) error { - if container.DockerId == "" { - return nil - } - llog := log.New("task", task, "container", container) - switch container.Container.KnownStatus { - case api.ContainerRunning: - containerInfo, err := engine.client.InspectContainer(container.DockerId) - if err != nil { - llog.Error("Error inspecting container", "err", err) - return err - } - - // Port bindings - if containerInfo.NetworkSettings != nil { - // Convert port bindings into the format our container expects - bindings, err := api.PortBindingFromDockerPortBinding(containerInfo.NetworkSettings.Ports) - if err != nil { - return err - } - container.Container.KnownPortBindings = bindings - } - - task.UpdateMountPoints(container.Container, containerInfo.Volumes) - case api.ContainerStopped: - containerInfo, err := engine.client.InspectContainer(container.DockerId) - if err != nil { - llog.Error("Error inspecting container", "err", err) - return err - } - - // Exit code - log.Debug("Updating exit code", "exit code", containerInfo.State.ExitCode) - container.Container.KnownExitCode = &containerInfo.State.ExitCode - } - - return nil -} - // TaskEvents returns channels to read task and container state changes. These // changes should be read as soon as possible as them not being read will block -// processing tasks and events. -func (engine *DockerTaskEngine) TaskEvents() <-chan api.ContainerStateChange { - return engine.container_events +// processing the task referenced by the event. +func (engine *DockerTaskEngine) TaskEvents() (<-chan api.TaskStateChange, <-chan api.ContainerStateChange) { + return engine.taskEvents, engine.containerEvents } -// TaskCompleted evaluates if a task is at a steady state; that is that all the -// containers have reached their desired status as well as the task itself -func TaskCompleted(task *api.Task) bool { - if task.KnownStatus < task.DesiredStatus { - return false - } - for _, container := range task.Containers { - if container.KnownStatus < container.DesiredStatus { - return false - } - } - return true -} - -func (engine *DockerTaskEngine) AddTask(task *api.Task) error { - task.PostUnmarshalTask() - - engine.processTasks.RLock() - task = engine.state.AddOrUpdateTask(task) - engine.processTasks.RUnlock() - - engine.applyTaskState(task) - return nil -} - -type transitionApplyFunc (func(*api.Task, *api.Container) error) +// startTask creates a taskState construct to track the task and then begins +// pushing it towards its desired state when allowed startTask is protected by +// the processTasks lock of 'AddTask'. It should not be called from anywhere +// else and should exit quickly to allow AddTask to do more work. +func (engine *DockerTaskEngine) startTask(task *api.Task) { + // Create a channel that may be used to communicate with this task, survey + // what tasks need to be waited for for this one to start, and then spin off + // a goroutine to oversee this task + + thisTask := engine.newManagedTask(task) + + go func(task *managedTask) { + llog := log.New("task", task) + // This goroutine now owns the lifecycle of this managedTask. No other + // thread should access this task or any of its members. + + // Do a single updatestatus at the beginning to create the container + // 'desiredstatus'es which are a construct of the engine used only here, + // not present on the backend + task.UpdateDesiredStatus() + for { + // First check for new transitions and let them 'skip to the head of the line' so to speak + // This lets a desiredStopped preempt moving to running / created + handleAcsMessage := func(desiredStatus api.TaskStatus) { + // Handle acs message changes this task's desired status to whatever + // acs says it should be if it is compatible + llog.Debug("New acs transition", "status", desiredStatus.String()) + task.DesiredStatus = desiredStatus + task.UpdateDesiredStatus() + } + handleDockerMessage := func(dockerMessage dockerContainerChange) { + // Handle docker message sets a container's known status to whatever + // docker said it was if it's a compatible change. + // In addition, if docker mentions interesting information (like exit + // codes or ports) this propegates them. + container := dockerMessage.container + found := false + for _, c := range task.Containers { + if container == c { + found = true + } + } + if !found { + llog.Crit("State error; task manager called with another task's container!", "container", container) + return + } + event := dockerMessage.event -func tryApplyTransition(task *api.Task, container *api.Container, to api.ContainerStatus, f transitionApplyFunc) error { - err := utils.RetryNWithBackoff(utils.NewSimpleBackoff(5*time.Second, 30*time.Second, 0.25, 2), 3, func() error { - return f(task, container) - }) + if event.Status <= container.KnownStatus { + llog.Info("Redundant status change; ignoring", "current", container.KnownStatus.String(), "change", event.Status.String()) + return + } + if event.Error != nil { + if event.Status == api.ContainerStopped { + // If we were trying to transition to stopped and had an error, we + // clearly can't just continue trying to transition it to stopped + // again and again... In this case, assume it's stopped (or close + // enough) and get on with it + // This actually happens a lot for the case of stopping something that was not running. + llog.Info("Error for 'docker stop' of container; assuming it's stopped anyways") + container.KnownStatus = api.ContainerStopped + container.DesiredStatus = api.ContainerStopped + } else if event.Status == api.ContainerPulled { + // Another special case; a failure to pull might not be fatal if e.g. the image already exists. + } else { + llog.Warn("Error with docker; stopping container", "container", container, "err", event.Error) + container.DesiredStatus = api.ContainerStopped + } + if container.ApplyingError == nil { + container.ApplyingError = api.NewApplyingError(event.Error) + } + } + container.KnownStatus = event.Status - if err == nil { - container.AppliedStatus = to - } - return err -} + if event.ExitCode != nil && event.ExitCode != container.KnownExitCode { + container.KnownExitCode = event.ExitCode + } + if event.PortBindings != nil { + container.KnownPortBindings = event.PortBindings + } + if event.Volumes != nil { + // No need to emit an event for this; this information is not propogated up yet + task.UpdateMountPoints(container, event.Volumes) + } -func (engine *DockerTaskEngine) applyContainerState(task *api.Task, container *api.Container) { - engine.processTasks.RLock() - defer engine.processTasks.RUnlock() + engine.emitContainerEvent(task.Task, container, "") + if task.UpdateStatus() { + llog.Debug("Container change also resulted in task change") + // If knownStatus changed, let it be known + engine.emitTaskEevnt(task.Task, "") + } + } - container.StatusLock.Lock() - defer container.StatusLock.Unlock() + // If it's steadyState, just spin until we need to do work + for task.KnownStatus == api.TaskRunning && task.KnownStatus >= task.DesiredStatus { + llog.Debug("Task at steady state", "state", task.KnownStatus.String()) + maxWait := time.NewTimer(steadyStateTaskVerifyInterval) + + select { + case desiredStatus := <-task.acsMessages: + handleAcsMessage(desiredStatus) + case dockerChange := <-task.dockerMessages: + handleDockerMessage(dockerChange) + case <-maxWait.C: + llog.Debug("No task events in wait time; inspecting just in case") + //engine.verifyTaskStatus(task) + } + maxWait.Stop() + } - clog := log.New("task", task, "container", container) - if container.KnownStatus == container.DesiredStatus { - clog.Debug("Container at desired status", "desired", container.DesiredStatus) - return - } - if container.AppliedStatus >= container.DesiredStatus { - clog.Debug("Container already working towards desired status", "desired", container.DesiredStatus) - return - } - if container.KnownStatus > container.DesiredStatus { - clog.Debug("Container past desired status") - return - } - if !dependencygraph.DependenciesAreResolved(container, task.Containers) { - clog.Info("Can't apply state to container yet; dependencies unresolved", "state", container.DesiredStatus) - return - } - // If we got here, the KnownStatus < DesiredStatus and we haven't applied - // DesiredStatus yet; appliy a step towards it now + // If our task is not steady state we should be able to move forwards one or more containers + for _, cont := range task.Containers { + newStatus, metadata := engine.applyContainerState(task.Task, cont) + handleDockerMessage(dockerContainerChange{ + container: cont, + event: DockerContainerChangeEvent{ + Status: newStatus, + DockerContainerMetadata: metadata, + }, + }) + } - var err error - // Terminal cases are special. If our desired status is terminal, then - // immediately go there with no regard to creating or starting the container - if container.DesiredTerminal() { - // Terminal cases are also special in that we do not record any error - // in applying this state; this is because it could overwrite an error - // that caused us to stop it and the previous error is more useful to - // show. This is also the only state where an error results in a - // state-change submission anyways. - if container.AppliedStatus < api.ContainerStopped { - err = tryApplyTransition(task, container, api.ContainerStopped, engine.stopContainer) - if err != nil { - clog.Info("Unable to stop container", "err", err) - // If there was an error, assume we won't get an event in the - // eventstream and emit it ourselves. - container.KnownStatus = api.ContainerStopped - engine.emitEvent(task, &api.DockerContainer{Container: container}, "") - if _, ok := err.(*docker.NoSuchContainer); ok { - engine.state.RemoveTask(task) - } - err = nil + if task.KnownStatus == api.TaskStopped { + break } } - } else if container.AppliedStatus < api.ContainerPulled { - err = tryApplyTransition(task, container, api.ContainerPulled, engine.pullContainer) - if err != nil { - clog.Warn("Unable to pull container image", "err", err) - } else { - // PullImage is a special case; update KnownStatus because there is - // no corresponding event from the docker eventstream to update - // this with. - container.KnownStatus = api.ContainerPulled + // We only break out of the above if this task is known to be stopped. Do + // onetime cleanup here, including removing the task after a timeout + llog.Debug("Task has reached stopped. We're just waiting and removing containers now") + cleanupTime := ttime.After(task.KnownTime.Add(taskStoppedDuration).Sub(ttime.Now())) + + ContinueCleanup: + for { + select { + case <-task.dockerMessages: + case <-task.acsMessages: + llog.Debug("ACS message recieved for already stopped task") + case <-cleanupTime: + llog.Debug("Cleaning up task's containers and data") + break ContinueCleanup + } } - } - if !container.DesiredTerminal() { - if container.AppliedStatus < api.ContainerCreated { - err = tryApplyTransition(task, container, api.ContainerCreated, engine.createContainer) - if err != nil { - clog.Warn("Unable to create container", "err", err) - } - } else if container.AppliedStatus < api.ContainerRunning { - err = tryApplyTransition(task, container, api.ContainerRunning, engine.startContainer) - if err != nil { - clog.Warn("Unable to start container", "err", err) + // First make an attempt to cleanup resources + engine.sweepTask(task.Task) + engine.state.RemoveTask(task.Task) + // Now remove ourselves from the global state and cleanup channels + engine.processTasks.Lock() + delete(engine.managedTasks, task.Arn) + engine.processTasks.Unlock() + FinishCleanup: + for { + // Cleanup any leftover messages before closing. No new messages possible + // because we deleted ourselves from managedTasks + select { + case <-task.dockerMessages: + case <-task.acsMessages: + default: + break FinishCleanup } } - } - if err != nil { - // If we were unable to successfully accomplish a state transition, - // we should move that container to 'stopped' - container.ApplyingError = api.NewApplyingError(err) - container.DesiredStatus = api.ContainerStopped - // Because our desired status is now stopped, we should call this - // function again to actually stop it - go engine.applyContainerState(task, container) - } else { - clog.Debug("Successfully applied transition") - } + close(task.dockerMessages) + close(task.acsMessages) + }(thisTask) +} - engine.saver.Save() +// updateTask determines if a new transition needs to be applied to the +// referenced task, and if needed applies it. It should not be called anywhere +// but from 'AddTask' and is protected by the processTasks lock there. +func (engine *DockerTaskEngine) updateTask(task *api.Task, update *api.Task) { + managedTask, ok := engine.managedTasks[task.Arn] + if !ok { + log.Crit("ACS message for a task we thought we managed, but don't!", "arn", task.Arn) + // Is this the right thing to do? + // Calling startTask should overwrite our bad 'state' data with the new + // task which we do manage.. but this is still scary and shouldn't have happened + engine.startTask(update) + return + } + // No longer need the lock; we can head off into a goroutine to not block it being released + go func() { + log.Debug("Putting update on the acs channel", "task", task.Arn, "status", update.DesiredStatus) + managedTask.acsMessages <- update.DesiredStatus + log.Debug("Update was taken off the acs channel", "task", task.Arn, "status", update.DesiredStatus) + }() } -// ApplyTaskState checks if there is any work to be done on a given task or any -// of the containers belonging to it, and if there is work to be done that can -// be done, it does it. This function can be called frequently (and should be -// called anytime a container changes) and will do nothing if the task is at a -// steady state -func (engine *DockerTaskEngine) applyTaskState(task *api.Task) { - llog := log.New("task", task) - llog.Info("Top of ApplyTaskState") +func (engine *DockerTaskEngine) AddTask(task *api.Task) error { + task.PostUnmarshalTask() - task.InferContainerDesiredStatus() + engine.processTasks.Lock() + defer engine.processTasks.Unlock() - if !dependencygraph.ValidDependencies(task) { - llog.Error("Invalid task dependency graph") - return - } - if TaskCompleted(task) { - llog.Info("Task completed, not acting upon it") - return + existingTask, exists := engine.state.TaskByArn(task.Arn) + if !exists { + engine.state.AddTask(task) + engine.startTask(task) + } else { + engine.updateTask(existingTask, task) } - for _, container := range task.Containers { - go engine.applyContainerState(task, container) - } + return nil +} + +type transitionApplyFunc (func(*api.Task, *api.Container) DockerContainerMetadata) + +func tryApplyTransition(task *api.Task, container *api.Container, to api.ContainerStatus, f transitionApplyFunc) DockerContainerMetadata { + return f(task, container) } func (engine *DockerTaskEngine) ListTasks() ([]*api.Task, error) { return engine.state.AllTasks(), nil } -func (engine *DockerTaskEngine) pullContainer(task *api.Task, container *api.Container) error { +func (engine *DockerTaskEngine) pullContainer(task *api.Task, container *api.Container) DockerContainerMetadata { log.Info("Pulling container", "task", task, "container", container) - err := engine.client.PullImage(container.Image) - if err != nil { - return err - } - return nil + return engine.client.PullImage(container.Image) } -func (engine *DockerTaskEngine) createContainer(task *api.Task, container *api.Container) error { +func (engine *DockerTaskEngine) createContainer(task *api.Task, container *api.Container) DockerContainerMetadata { log.Info("Creating container", "task", task, "container", container) config, err := task.DockerConfig(container) if err != nil { - return err + return DockerContainerMetadata{Error: err} } - err = func() error { - name := "" - for i := 0; i < len(container.Name); i++ { - c := container.Name[i] - if !((c <= '9' && c >= '0') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c == '-')) { - continue - } - name += string(c) + name := "" + for i := 0; i < len(container.Name); i++ { + c := container.Name[i] + if !((c <= '9' && c >= '0') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c == '-')) { + continue } + name += string(c) + } - containerName := "ecs-" + task.Family + "-" + task.Version + "-" + name + "-" + utils.RandHex() - - // Lock state for writing so that handleDockerEvents will block on - // resolving the 'create' event's dockerid until it is actually in the - // added to state. - engine.state.Lock() - defer engine.state.Unlock() + containerName := "ecs-" + task.Family + "-" + task.Version + "-" + name + "-" + utils.RandHex() - // Pre-add the container in case we stop before the next, more useful, - // AddContainer call. This ensures we have a way to get the container if - // we die before 'createContainer' returns because we can inspect by - // name - engine.state.AddContainer(&api.DockerContainer{DockerName: containerName, Container: container}, task) + // Pre-add the container in case we stop before the next, more useful, + // AddContainer call. This ensures we have a way to get the container if + // we die before 'createContainer' returns because we can inspect by + // name + engine.state.AddContainer(&api.DockerContainer{DockerName: containerName, Container: container}, task) - containerId, err := engine.client.CreateContainer(config, containerName) - if err != nil { - return err - } - engine.state.AddContainer(&api.DockerContainer{DockerId: containerId, DockerName: containerName, Container: container}, task) - log.Info("Created container successfully", "task", task, "container", container) - return nil - }() - return err + metadata := engine.client.CreateContainer(config, containerName) + if metadata.Error != nil { + return metadata + } + engine.state.AddContainer(&api.DockerContainer{DockerId: metadata.DockerId, DockerName: containerName, Container: container}, task) + log.Info("Created container successfully", "task", task, "container", container) + return metadata } -func (engine *DockerTaskEngine) startContainer(task *api.Task, container *api.Container) error { +func (engine *DockerTaskEngine) startContainer(task *api.Task, container *api.Container) DockerContainerMetadata { log.Info("Starting container", "task", task, "container", container) containerMap, ok := engine.state.ContainerMapByArn(task.Arn) if !ok { - return errors.New("No such task: " + task.Arn) + // TODO error type + return DockerContainerMetadata{Error: errors.New("No such task: " + task.Arn)} } dockerContainer, ok := containerMap[container.Name] if !ok { - return errors.New("No container named '" + container.Name + "' created in " + task.Arn) + // TODO type + return DockerContainerMetadata{Error: errors.New("No container named '" + container.Name + "' created in " + task.Arn)} } hostConfig, err := task.DockerHostConfig(container, containerMap) if err != nil { - return err + // TODO hostconfig function neesd an error tyep + return DockerContainerMetadata{Error: err} } return engine.client.StartContainer(dockerContainer.DockerId, hostConfig) } -func (engine *DockerTaskEngine) stopContainer(task *api.Task, container *api.Container) error { +func (engine *DockerTaskEngine) stopContainer(task *api.Task, container *api.Container) DockerContainerMetadata { log.Info("Stopping container", "task", task, "container", container) containerMap, ok := engine.state.ContainerMapByArn(task.Arn) if !ok { - return errors.New("No such task: " + task.Arn) + // TODO error type + return DockerContainerMetadata{Error: errors.New("No such task: " + task.Arn)} } dockerContainer, ok := containerMap[container.Name] if !ok { - return errors.New("No container named '" + container.Name + "' created in " + task.Arn) + // TODO type + return DockerContainerMetadata{Error: errors.New("No container named '" + container.Name + "' created in " + task.Arn)} } return engine.client.StopContainer(dockerContainer.DockerId) @@ -636,3 +652,52 @@ func (engine *DockerTaskEngine) Version() (string, error) { } return engine.client.Version() } + +func (engine *DockerTaskEngine) transitionFunctionMap() map[api.ContainerStatus]transitionApplyFunc { + return map[api.ContainerStatus]transitionApplyFunc{ + api.ContainerPulled: engine.pullContainer, + api.ContainerCreated: engine.createContainer, + api.ContainerRunning: engine.startContainer, + api.ContainerStopped: engine.stopContainer, + } +} + +func (engine *DockerTaskEngine) applyContainerState(task *api.Task, container *api.Container) (api.ContainerStatus, DockerContainerMetadata) { + clog := log.New("task", task, "container", container) + if container.KnownStatus == container.DesiredStatus { + clog.Debug("Container at desired status", "desired", container.DesiredStatus) + return api.ContainerStatusNone, DockerContainerMetadata{} + } + if container.KnownStatus > container.DesiredStatus { + clog.Debug("Container past desired status") + return api.ContainerStatusNone, DockerContainerMetadata{} + } + if !dependencygraph.DependenciesAreResolved(container, task.Containers) { + clog.Debug("Can't apply state to container yet; dependencies unresolved", "state", container.DesiredStatus) + return api.ContainerStatusNone, DockerContainerMetadata{} + } + // If we got here, the KnownStatus < DesiredStatus + + var nextState api.ContainerStatus + if container.DesiredTerminal() { + nextState = api.ContainerStopped + } else { + nextState = container.KnownStatus + 1 + } + + transitionFunction, ok := engine.transitionFunctionMap()[nextState] + if !ok { + clog.Crit("Container desired to transition to an unsupported state", "state", nextState.String()) + // TODO THIS NEEDS TO BE A REAL UNRETRIABLE ERROR + return api.ContainerStatusNone, DockerContainerMetadata{Error: errors.New("Things went to crap :(")} + } + + metadata := tryApplyTransition(task, container, nextState, transitionFunction) + if metadata.Error != nil { + clog.Info("Error transitioning container", "state", nextState.String()) + } else { + clog.Debug("Transitioned container", "state", nextState.String()) + engine.saver.Save() + } + return nextState, metadata +} diff --git a/agent/engine/dockerauth/ecs/ecs_credentials.go b/agent/engine/dockerauth/ecs/ecs_credentials.go index 1582043c908..08bbbeba50f 100644 --- a/agent/engine/dockerauth/ecs/ecs_credentials.go +++ b/agent/engine/dockerauth/ecs/ecs_credentials.go @@ -56,6 +56,7 @@ func (creds *ecsCredentials) Provide() credentialprovider.DockerConfig { if err != nil { log.Error("Unable to decode provided docker credentials", "type", creds.EngineAuthType) } + case "": default: log.Error("Unrecognized AuthType type") } diff --git a/agent/engine/dockerstate/docker_task_engine_state.go b/agent/engine/dockerstate/docker_task_engine_state.go index 6b0377a6552..f1830d3ee38 100644 --- a/agent/engine/dockerstate/docker_task_engine_state.go +++ b/agent/engine/dockerstate/docker_task_engine_state.go @@ -81,25 +81,14 @@ func (state *DockerTaskEngineState) TaskById(cid string) (*api.Task, bool) { return state.TaskByArn(arn) } -// AddOrUpdate task adds a new task to the state, and if it already exists -// updates the existing task to match the argument's DesiredStatus. This method -// *does* aquire a write lock. -func (state *DockerTaskEngineState) AddOrUpdateTask(task *api.Task) *api.Task { +// AddTask adds a new task to the state +func (state *DockerTaskEngineState) AddTask(task *api.Task) { state.Lock() defer state.Unlock() - current, exists := state.tasks[task.Arn] - if !exists { - state.tasks[task.Arn] = task - return task - } - - // Update - if task.DesiredStatus > current.DesiredStatus { - current.DesiredStatus = task.DesiredStatus - } + state.tasks[task.Arn] = task - return current + return } // RemoveTask removes a task from this state. It removes all containers and diff --git a/agent/engine/dockerstate/dockerstate_test.go b/agent/engine/dockerstate/dockerstate_test.go index 7e5792dfc35..73181eee1b8 100644 --- a/agent/engine/dockerstate/dockerstate_test.go +++ b/agent/engine/dockerstate/dockerstate_test.go @@ -43,7 +43,7 @@ func TestAddTask(t *testing.T) { state := NewDockerTaskEngineState() testTask := &api.Task{Arn: "test"} - state.AddOrUpdateTask(testTask) + state.AddTask(testTask) if len(state.AllTasks()) != 1 { t.Error("Should have 1 task") @@ -63,7 +63,7 @@ func TestTwophaseAddContainer(t *testing.T) { testTask := &api.Task{Arn: "test", Containers: []*api.Container{&api.Container{ Name: "testContainer", }}} - state.AddOrUpdateTask(testTask) + state.AddTask(testTask) state.AddContainer(&api.DockerContainer{DockerName: "dockerName", Container: testTask.Containers[0]}, testTask) @@ -136,7 +136,7 @@ func TestRemoveTask(t *testing.T) { Containers: []*api.Container{testContainer}, } - state.AddOrUpdateTask(testTask) + state.AddTask(testTask) state.AddContainer(testDockerContainer, testTask) tasks := state.AllTasks() diff --git a/agent/engine/dockerstate/json.go b/agent/engine/dockerstate/json.go index fced3fed56a..69db352b554 100644 --- a/agent/engine/dockerstate/json.go +++ b/agent/engine/dockerstate/json.go @@ -53,7 +53,7 @@ func (state *DockerTaskEngineState) UnmarshalJSON(data []byte) error { clean := NewDockerTaskEngineState() for _, task := range saved.Tasks { - clean.AddOrUpdateTask(task) + clean.AddTask(task) } for id, container := range saved.IdToContainer { taskArn, ok := saved.IdToTask[id] diff --git a/agent/engine/dockerstate/testutils/json_test.go b/agent/engine/dockerstate/testutils/json_test.go index 39bd9b6da47..5c2d58799c2 100644 --- a/agent/engine/dockerstate/testutils/json_test.go +++ b/agent/engine/dockerstate/testutils/json_test.go @@ -70,7 +70,7 @@ func TestJsonEncoding(t *testing.T) { testState := dockerstate.NewDockerTaskEngineState() testTask := createTestTask("test1", 1) - testState.AddOrUpdateTask(testTask) + testState.AddTask(testTask) for i, cont := range testTask.Containers { testState.AddContainer(&api.DockerContainer{DockerId: "docker" + strconv.Itoa(i), DockerName: "someName", Container: cont}, testTask) } diff --git a/agent/engine/engine_integ_test.go b/agent/engine/engine_integ_test.go index dedf55a83c8..7954574e37c 100644 --- a/agent/engine/engine_integ_test.go +++ b/agent/engine/engine_integ_test.go @@ -47,6 +47,22 @@ func init() { ttime.SetTime(test_time) } +func setup(t *testing.T) TaskEngine { + te := NewTaskEngine(cfg) + te.Init() + if testing.Short() { + t.Skip("Skipping integ test in short mode") + } + if _, err := os.Stat("/var/run/docker.sock"); err != nil { + t.Skip("Docker not running") + } + if os.Getenv("ECS_SKIP_ENGINE_INTEG_TEST") != "" { + t.Skip("ECS_SKIP_ENGINE_INTEG_TEST") + } + + return te +} + func createTestContainer() *api.Container { return &api.Container{ Name: "netcat", @@ -115,13 +131,10 @@ func runProxyAuthRegistry() { })) } -var taskEngine TaskEngine var cfg *config.Config func init() { cfg, _ = config.NewConfig() - taskEngine = NewTaskEngine(cfg) - taskEngine.Init() go runProxyAuthRegistry() } @@ -137,32 +150,31 @@ func removeImage(img string) { // TestStartStopUnpulledImage ensures that an unpulled image is successfully // pulled, run, and stopped via docker. func TestStartStopUnpulledImage(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } - + taskEngine := setup(t) // Ensure this image isn't pulled by deleting it removeImage(testRegistryImage) testTask := createTestTask("testStartUnpulled") - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - expected_events := []api.TaskStatus{api.TaskCreated, api.TaskRunning, api.TaskStopped} + expected_events := []api.TaskStatus{api.TaskRunning, api.TaskStopped} - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } expected_event := expected_events[0] expected_events = expected_events[1:] - if task_event.TaskStatus != expected_event { - t.Error("Got event " + task_event.TaskStatus.String() + " but expected " + expected_event.String()) + if taskEvent.Status != expected_event { + t.Error("Got event " + taskEvent.Status.String() + " but expected " + expected_event.String()) } if len(expected_events) == 0 { break @@ -174,30 +186,30 @@ func TestStartStopUnpulledImage(t *testing.T) { // 24751 and verifies that when you do forward the port you can access it and if // you don't forward the port you can't func TestPortForward(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() testArn := "testPortForwardFail" testTask := createTestTask(testArn) testTask.Containers[0].Command = []string{"-l=24751", "-serve", "ecs test container"} // Port not forwarded; verify we can't access it - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskRunning { + if taskEvent.Status == api.TaskRunning { break - } else if task_event.TaskStatus > api.TaskRunning { - t.Fatal("Task went straight to " + task_event.TaskStatus.String() + " without running") + } else if taskEvent.Status > api.TaskRunning { + t.Fatal("Task went straight to " + taskEvent.Status.String() + " without running") } } _, err := net.DialTimeout("tcp", "127.0.0.1:24751", 20*time.Millisecond) @@ -213,11 +225,11 @@ func TestPortForward(t *testing.T) { if err != nil { t.Error("Could not kill container", err) } - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus >= api.TaskStopped { + if taskEvent.Status >= api.TaskStopped { break } } @@ -228,16 +240,16 @@ func TestPortForward(t *testing.T) { testTask.Containers[0].Command = []string{"-l=24751", "-serve", "ecs test container"} testTask.Containers[0].Ports = []api.PortBinding{api.PortBinding{ContainerPort: 24751, HostPort: 24751}} - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskRunning { + if taskEvent.Status == api.TaskRunning { break - } else if task_event.TaskStatus > api.TaskRunning { - t.Fatal("Task went straight to " + task_event.TaskStatus.String() + " without running") + } else if taskEvent.Status > api.TaskRunning { + t.Fatal("Task went straight to " + taskEvent.Status.String() + " without running") } } @@ -257,13 +269,14 @@ func TestPortForward(t *testing.T) { } // Stop the existing container now - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } @@ -272,35 +285,36 @@ func TestPortForward(t *testing.T) { // TestMultiplePortForwards tests that two links containers in the same task can // both expose ports successfully func TestMultiplePortForwards(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, containerEvents := taskEngine.TaskEvents() + go func() { + for { + <-containerEvents + } + }() // Forward it and make sure that works testArn := "testMultiplePortForwards" testTask := createTestTask(testArn) testTask.Containers[0].Command = []string{"-l=24751", "-serve", "ecs test container1"} testTask.Containers[0].Ports = []api.PortBinding{api.PortBinding{ContainerPort: 24751, HostPort: 24751}} + testTask.Containers[0].Essential = false testTask.Containers = append(testTask.Containers, createTestContainer()) testTask.Containers[1].Name = "nc2" testTask.Containers[1].Command = []string{"-l=24751", "-serve", "ecs test container2"} testTask.Containers[1].Ports = []api.PortBinding{api.PortBinding{ContainerPort: 24751, HostPort: 24752}} - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskRunning { + if taskEvent.Status == api.TaskRunning { break - } else if task_event.TaskStatus > api.TaskRunning { - t.Fatal("Task went straight to " + task_event.TaskStatus.String() + " without running") + } else if taskEvent.Status > api.TaskRunning { + t.Fatal("Task went straight to " + taskEvent.Status.String() + " without running") } } @@ -310,27 +324,31 @@ func TestMultiplePortForwards(t *testing.T) { if err != nil { t.Fatal("Error dialing simple container 1 " + err.Error()) } + t.Log("Dialed first container") response, _ := ioutil.ReadAll(conn) if string(response) != "ecs test container1" { t.Error("Got response: " + string(response) + " instead of 'ecs test container1'") } + t.Log("Read first container") conn, err = net.DialTimeout("tcp", "127.0.0.1:24752", 20*time.Millisecond) if err != nil { t.Fatal("Error dialing simple container 2 " + err.Error()) } + t.Log("Dialed second container") response, _ = ioutil.ReadAll(conn) if string(response) != "ecs test container2" { t.Error("Got response: " + string(response) + " instead of 'ecs test container2'") } + t.Log("Read second container") - // Kill the existing container now - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } @@ -339,14 +357,9 @@ func TestMultiplePortForwards(t *testing.T) { // TestDynamicPortForward runs a container serving data on a port chosen by the // docker deamon and verifies that the port is reported in the state-change func TestDynamicPortForward(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() testArn := "testDynamicPortForward" testTask := createTestTask(testArn) @@ -354,20 +367,26 @@ func TestDynamicPortForward(t *testing.T) { // No HostPort = docker should pick testTask.Containers[0].Ports = []api.PortBinding{api.PortBinding{ContainerPort: 24751}} - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) var portBindings []api.PortBinding - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for contEvent := range contEvents { + if contEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskRunning { - portBindings = task_event.PortBindings + if contEvent.Status == api.ContainerRunning { + portBindings = contEvent.PortBindings break - } else if task_event.TaskStatus > api.TaskRunning { - t.Fatal("Task went straight to " + task_event.TaskStatus.String() + " without running") + } else if contEvent.Status > api.ContainerRunning { + t.Fatal("Container went straight to " + contEvent.Status.String() + " without running") } } + // discard other container events + go func() { + for { + <-contEvents + } + }() if len(portBindings) != 1 { t.Error("PortBindings was not set; should have been len 1", portBindings) @@ -395,27 +414,23 @@ func TestDynamicPortForward(t *testing.T) { } // Kill the existing container now - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } } func TestMultipleDynamicPortForward(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() testArn := "testDynamicPortForward2" testTask := createTestTask(testArn) @@ -423,20 +438,25 @@ func TestMultipleDynamicPortForward(t *testing.T) { // No HostPort or 0 hostport; docker should pick two ports for us testTask.Containers[0].Ports = []api.PortBinding{api.PortBinding{ContainerPort: 24751}, api.PortBinding{ContainerPort: 24751, HostPort: 0}} - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) var portBindings []api.PortBinding - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for contEvent := range contEvents { + if contEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskRunning { - portBindings = task_event.PortBindings + if contEvent.Status == api.ContainerRunning { + portBindings = contEvent.PortBindings break - } else if task_event.TaskStatus > api.TaskRunning { - t.Fatal("Task went straight to " + task_event.TaskStatus.String() + " without running") + } else if contEvent.Status > api.ContainerRunning { + t.Fatal("Task went straight to " + contEvent.Status.String() + " without running") } } + go func() { + for { + <-contEvents + } + }() if len(portBindings) != 2 { t.Error("Could not bind to two ports from one container port", portBindings) @@ -484,13 +504,14 @@ func TestMultipleDynamicPortForward(t *testing.T) { } // Kill the existing container now - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } @@ -501,12 +522,7 @@ func TestMultipleDynamicPortForward(t *testing.T) { // prints "hello linker" and then links a container that proxies that data to // a publicly exposed port, where the tests reads it func TestLinking(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) testTask := createTestTask("TestLinking") testTask.Containers = append(testTask.Containers, createTestContainer()) @@ -516,18 +532,23 @@ func TestLinking(t *testing.T) { testTask.Containers[1].Links = []string{"linkee:linkee_alias"} testTask.Containers[1].Ports = []api.PortBinding{api.PortBinding{ContainerPort: 24751, HostPort: 24751}} - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskRunning { + if taskEvent.Status == api.TaskRunning { break - } else if task_event.TaskStatus > api.TaskRunning { - t.Fatal("Task went straight to " + task_event.TaskStatus.String() + " without running") + } else if taskEvent.Status > api.TaskRunning { + t.Fatal("Task went straight to " + taskEvent.Status.String() + " without running") } } @@ -546,26 +567,22 @@ func TestLinking(t *testing.T) { t.Error("Got response: " + string(response) + " instead of 'hello linker'") } - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } } func TestDockerCfgAuth(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) removeImage(testAuthRegistryImage) authString := base64.StdEncoding.EncodeToString([]byte(testAuthUser + ":" + testAuthPass)) @@ -579,32 +596,38 @@ func TestDockerCfgAuth(t *testing.T) { testTask := createTestTask("testDockerCfgAuth") testTask.Containers[0].Image = testAuthRegistryImage - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - expected_events := []api.TaskStatus{api.TaskCreated, api.TaskRunning} + expected_events := []api.TaskStatus{api.TaskRunning} - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } expected_event := expected_events[0] expected_events = expected_events[1:] - if task_event.TaskStatus != expected_event { - t.Error("Got event " + task_event.TaskStatus.String() + " but expected " + expected_event.String()) + if taskEvent.Status != expected_event { + t.Error("Got event " + taskEvent.Status.String() + " but expected " + expected_event.String()) } if len(expected_events) == 0 { break } } - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn == testTask.Arn { - if !(task_event.TaskStatus >= api.TaskStopped) { - t.Error("Expected only terminal events; got " + task_event.TaskStatus.String()) + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) + for taskEvent := range taskEvents { + if taskEvent.TaskArn == testTask.Arn { + if !(taskEvent.Status >= api.TaskStopped) { + t.Error("Expected only terminal events; got " + taskEvent.Status.String()) } break } @@ -612,12 +635,7 @@ func TestDockerCfgAuth(t *testing.T) { } func TestDockerAuth(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) removeImage(testAuthRegistryImage) cfg.EngineAuthData = []byte(`{"http://` + testAuthRegistryHost + `":{"username":"` + testAuthUser + `","password":"` + testAuthPass + `"}}`) @@ -630,32 +648,38 @@ func TestDockerAuth(t *testing.T) { testTask := createTestTask("testDockerAuth") testTask.Containers[0].Image = testAuthRegistryImage - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - expected_events := []api.TaskStatus{api.TaskCreated, api.TaskRunning} + expected_events := []api.TaskStatus{api.TaskRunning} - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } expected_event := expected_events[0] expected_events = expected_events[1:] - if task_event.TaskStatus != expected_event { - t.Error("Got event " + task_event.TaskStatus.String() + " but expected " + expected_event.String()) + if taskEvent.Status != expected_event { + t.Error("Got event " + taskEvent.Status.String() + " but expected " + expected_event.String()) } if len(expected_events) == 0 { break } } - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn == testTask.Arn { - if !(task_event.TaskStatus >= api.TaskStopped) { - t.Error("Expected only terminal events; got " + task_event.TaskStatus.String()) + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) + for taskEvent := range taskEvents { + if taskEvent.TaskArn == testTask.Arn { + if !(taskEvent.Status >= api.TaskStopped) { + t.Error("Expected only terminal events; got " + taskEvent.Status.String()) } break } @@ -663,14 +687,14 @@ func TestDockerAuth(t *testing.T) { } func TestVolumesFrom(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() testTask := createTestTask("testVolumeContainer") testTask.Containers[0].Image = testVolumeImage @@ -681,13 +705,13 @@ func TestVolumesFrom(t *testing.T) { testTask.Containers[1].Command = []string{"cat /data/test-file | nc -l -p 80"} testTask.Containers[1].Ports = []api.PortBinding{api.PortBinding{ContainerPort: 80, HostPort: 24751}} - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskRunning { + if taskEvent.Status == api.TaskRunning { break } } @@ -705,28 +729,29 @@ func TestVolumesFrom(t *testing.T) { t.Error("Got response: " + strings.TrimSpace(string(response)) + " instead of 'test'") } - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) + taskUpdate := *testTask + taskUpdate.DesiredStatus = api.TaskStopped + taskEngine.AddTask(&taskUpdate) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } } func TestVolumesFromRO(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() testTask := createTestTask("testVolumeROContainer") testTask.Containers[0].Image = testVolumeImage @@ -734,7 +759,7 @@ func TestVolumesFromRO(t *testing.T) { cont := createTestContainer() cont.Name = "test" + strconv.Itoa(i) cont.Image = testVolumeImage - cont.Essential = false + cont.Essential = i > 0 testTask.Containers = append(testTask.Containers, cont) } testTask.Containers[1].VolumesFrom = []api.VolumeFrom{api.VolumeFrom{SourceContainer: testTask.Containers[0].Name, ReadOnly: true}} @@ -744,16 +769,12 @@ func TestVolumesFromRO(t *testing.T) { testTask.Containers[3].VolumesFrom = []api.VolumeFrom{api.VolumeFrom{SourceContainer: testTask.Containers[0].Name, ReadOnly: false}} testTask.Containers[3].Command = []string{"touch /data/notreadonly-fs-2 || exit 42"} - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) -WaitStopped: - for _ = range task_events { - for i := 1; i <= 3; i++ { - if testTask.Containers[i].KnownStatus < api.ContainerStopped { - continue WaitStopped - } + for taskEvent := range taskEvents { + if taskEvent.Status == api.TaskStopped { + break } - break } if testTask.Containers[1].KnownExitCode == nil || *testTask.Containers[1].KnownExitCode != 42 { @@ -765,29 +786,17 @@ WaitStopped: if testTask.Containers[3].KnownExitCode == nil || *testTask.Containers[3].KnownExitCode != 0 { t.Error("Couldn't touch with explicit rw") } - - testTask.DesiredStatus = api.TaskStopped - go taskEngine.AddTask(testTask) - - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { - continue - } - if task_event.TaskStatus == api.TaskStopped { - break - } - } } func TestHostVolumeMount(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() tmpPath, _ := ioutil.TempDir("", "ecs_volume_test") defer os.RemoveAll(tmpPath) @@ -798,13 +807,13 @@ func TestHostVolumeMount(t *testing.T) { testTask.Containers[0].Image = testVolumeImage testTask.Containers[0].MountPoints = []api.MountPoint{api.MountPoint{ContainerPath: "/host/tmp", SourceVolume: "test-tmp"}} testTask.Containers[0].Command = []string{`echo -n "hi" > /host/tmp/hello-from-container; if [[ "$(cat /host/tmp/test-file)" != "test-data" ]]; then exit 4; fi; exit 42`} - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } @@ -819,14 +828,14 @@ func TestHostVolumeMount(t *testing.T) { } func TestEmptyHostVolumeMount(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() testTask := createTestTask("testEmptyHostVolumeMount") testTask.Volumes = []api.TaskVolume{api.TaskVolume{Name: "test-tmp", Volume: &api.EmptyHostVolume{}}} @@ -839,13 +848,13 @@ func TestEmptyHostVolumeMount(t *testing.T) { testTask.Containers[1].MountPoints = []api.MountPoint{api.MountPoint{ContainerPath: "/alsoempty/", SourceVolume: "test-tmp"}} testTask.Containers[1].Command = []string{`touch /alsoempty/file`} testTask.Containers[1].Essential = false - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } - if task_event.TaskStatus == api.TaskStopped { + if taskEvent.Status == api.TaskStopped { break } } @@ -856,29 +865,29 @@ func TestEmptyHostVolumeMount(t *testing.T) { } func TestSweepContainer(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integ test in short mode") - } - if _, err := os.Stat("/var/run/docker.sock"); err != nil { - t.Skip("Docker not running") - } + taskEngine := setup(t) - task_events := taskEngine.TaskEvents() + taskEvents, contEvents := taskEngine.TaskEvents() + go func() { + for { + <-contEvents + } + }() testTask := createTestTask("testSweepContainer") - go taskEngine.AddTask(testTask) + taskEngine.AddTask(testTask) - expected_events := []api.TaskStatus{api.TaskCreated, api.TaskRunning, api.TaskStopped} + expected_events := []api.TaskStatus{api.TaskRunning, api.TaskStopped} - for task_event := range task_events { - if task_event.TaskArn != testTask.Arn { + for taskEvent := range taskEvents { + if taskEvent.TaskArn != testTask.Arn { continue } expected_event := expected_events[0] expected_events = expected_events[1:] - if task_event.TaskStatus != expected_event { - t.Error("Got event " + task_event.TaskStatus.String() + " but expected " + expected_event.String()) + if taskEvent.Status != expected_event { + t.Error("Got event " + taskEvent.Status.String() + " but expected " + expected_event.String()) } if len(expected_events) == 0 { break diff --git a/agent/engine/errors.go b/agent/engine/errors.go new file mode 100644 index 00000000000..00a22ef62d0 --- /dev/null +++ b/agent/engine/errors.go @@ -0,0 +1 @@ +package engine diff --git a/agent/engine/interface.go b/agent/engine/interface.go index 0eb43ade7e6..e05d9811d09 100644 --- a/agent/engine/interface.go +++ b/agent/engine/interface.go @@ -26,7 +26,7 @@ type TaskEngine interface { // this task engine from processing new tasks Disable() - TaskEvents() <-chan api.ContainerStateChange + TaskEvents() (<-chan api.TaskStateChange, <-chan api.ContainerStateChange) SetSaver(statemanager.Saver) // AddTask adds a new task to the task engine and manages its container's diff --git a/agent/engine/mocks/engine_mocks.go b/agent/engine/mocks/engine_mocks.go index dbf375cde29..501ed654648 100644 --- a/agent/engine/mocks/engine_mocks.go +++ b/agent/engine/mocks/engine_mocks.go @@ -111,10 +111,11 @@ func (_mr *_MockTaskEngineRecorder) SetSaver(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "SetSaver", arg0) } -func (_m *MockTaskEngine) TaskEvents() <-chan api.ContainerStateChange { +func (_m *MockTaskEngine) TaskEvents() (<-chan api.TaskStateChange, <-chan api.ContainerStateChange) { ret := _m.ctrl.Call(_m, "TaskEvents") - ret0, _ := ret[0].(<-chan api.ContainerStateChange) - return ret0 + ret0, _ := ret[0].(<-chan api.TaskStateChange) + ret1, _ := ret[1].(<-chan api.ContainerStateChange) + return ret0, ret1 } func (_mr *_MockTaskEngineRecorder) TaskEvents() *gomock.Call { @@ -174,11 +175,10 @@ func (_mr *_MockDockerClientRecorder) ContainerEvents() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "ContainerEvents") } -func (_m *MockDockerClient) CreateContainer(_param0 *go_dockerclient.Config, _param1 string) (string, error) { +func (_m *MockDockerClient) CreateContainer(_param0 *go_dockerclient.Config, _param1 string) engine.DockerContainerMetadata { ret := _m.ctrl.Call(_m, "CreateContainer", _param0, _param1) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(engine.DockerContainerMetadata) + return ret0 } func (_mr *_MockDockerClientRecorder) CreateContainer(arg0, arg1 interface{}) *gomock.Call { @@ -218,9 +218,9 @@ func (_mr *_MockDockerClientRecorder) InspectContainer(arg0 interface{}) *gomock return _mr.mock.ctrl.RecordCall(_mr.mock, "InspectContainer", arg0) } -func (_m *MockDockerClient) PullImage(_param0 string) error { +func (_m *MockDockerClient) PullImage(_param0 string) engine.DockerContainerMetadata { ret := _m.ctrl.Call(_m, "PullImage", _param0) - ret0, _ := ret[0].(error) + ret0, _ := ret[0].(engine.DockerContainerMetadata) return ret0 } @@ -238,9 +238,9 @@ func (_mr *_MockDockerClientRecorder) RemoveContainer(arg0 interface{}) *gomock. return _mr.mock.ctrl.RecordCall(_mr.mock, "RemoveContainer", arg0) } -func (_m *MockDockerClient) StartContainer(_param0 string, _param1 *go_dockerclient.HostConfig) error { +func (_m *MockDockerClient) StartContainer(_param0 string, _param1 *go_dockerclient.HostConfig) engine.DockerContainerMetadata { ret := _m.ctrl.Call(_m, "StartContainer", _param0, _param1) - ret0, _ := ret[0].(error) + ret0, _ := ret[0].(engine.DockerContainerMetadata) return ret0 } @@ -248,9 +248,9 @@ func (_mr *_MockDockerClientRecorder) StartContainer(arg0, arg1 interface{}) *go return _mr.mock.ctrl.RecordCall(_mr.mock, "StartContainer", arg0, arg1) } -func (_m *MockDockerClient) StopContainer(_param0 string) error { +func (_m *MockDockerClient) StopContainer(_param0 string) engine.DockerContainerMetadata { ret := _m.ctrl.Call(_m, "StopContainer", _param0) - ret0, _ := ret[0].(error) + ret0, _ := ret[0].(engine.DockerContainerMetadata) return ret0 } diff --git a/agent/engine/types.go b/agent/engine/types.go index f3f28bab114..3b41617ff55 100644 --- a/agent/engine/types.go +++ b/agent/engine/types.go @@ -26,7 +26,15 @@ func (cnferror ContainerNotFound) Error() string { } type DockerContainerChangeEvent struct { - DockerId string - Image string - Status api.ContainerStatus + Status api.ContainerStatus + + DockerContainerMetadata +} + +type DockerContainerMetadata struct { + DockerId string + ExitCode *int + PortBindings []api.PortBinding + Error error + Volumes map[string]string } diff --git a/agent/eventhandler/handler.go b/agent/eventhandler/handler.go index 208ad9821d3..92b2de86446 100644 --- a/agent/eventhandler/handler.go +++ b/agent/eventhandler/handler.go @@ -29,18 +29,26 @@ var statesaver statemanager.Saver = statemanager.NewNoopStateManager() func HandleEngineEvents(taskEngine engine.TaskEngine, client api.ECSClient, saver statemanager.Saver) { statesaver = saver for { - task_events := taskEngine.TaskEvents() + taskEvents, containerEvents := taskEngine.TaskEvents() - for task_events != nil { + for taskEvents != nil && containerEvents != nil { select { - case event, open := <-task_events: + case event, open := <-taskEvents: if !open { - task_events = nil - log.Error("Task events closed; this should not happen") + taskEvents = nil + log.Crit("Task events closed") break } go AddTaskEvent(event, client) + case event, open := <-containerEvents: + if !open { + containerEvents = nil + log.Error("Container events closed") + break + } + + go AddContainerEvent(event, client) } } } diff --git a/agent/eventhandler/handler_test.go b/agent/eventhandler/handler_test.go index 75a21423973..5646b211e10 100644 --- a/agent/eventhandler/handler_test.go +++ b/agent/eventhandler/handler_test.go @@ -15,6 +15,7 @@ package eventhandler import ( "errors" + "fmt" "strconv" "sync/atomic" "testing" @@ -27,11 +28,12 @@ import ( "github.com/aws/amazon-ecs-agent/agent/utils" ) -type changeFn func(change api.ContainerStateChange) utils.RetriableError +type containerChangeFn func(change api.ContainerStateChange) utils.RetriableError +type taskChangeFn func(change api.TaskStateChange) utils.RetriableError type MockECSClient struct { - submitTaskStateChange changeFn - submitContainerStateChange changeFn + submitTaskStateChange taskChangeFn + submitContainerStateChange containerChangeFn } func (m *MockECSClient) CredentialProvider() credentials.AWSCredentialProvider { @@ -43,40 +45,37 @@ func (m *MockECSClient) RegisterContainerInstance() (string, error) { func (m *MockECSClient) DiscoverPollEndpoint(string) (string, error) { return "", nil } -func (m *MockECSClient) SubmitTaskStateChange(change api.ContainerStateChange) utils.RetriableError { +func (m *MockECSClient) SubmitTaskStateChange(change api.TaskStateChange) utils.RetriableError { return m.submitTaskStateChange(change) } func (m *MockECSClient) SubmitContainerStateChange(change api.ContainerStateChange) utils.RetriableError { return m.submitContainerStateChange(change) } -func mockClient(task, cont changeFn) api.ECSClient { +func mockClient(task taskChangeFn, cont containerChangeFn) api.ECSClient { return &MockECSClient{ task, cont, } } func contEvent(arn string) api.ContainerStateChange { - cont := &api.Container{SentStatus: api.ContainerStatusNone} - return api.ContainerStateChange{TaskArn: arn, Status: api.ContainerRunning, Container: cont} + return api.ContainerStateChange{TaskArn: arn, Status: api.ContainerRunning} } -func taskEvent(arn string) api.ContainerStateChange { - cont := &api.Container{SentStatus: api.ContainerStatusNone} - task := &api.Task{SentStatus: api.TaskStatusNone} - return api.ContainerStateChange{TaskArn: arn, Status: api.ContainerRunning, TaskStatus: api.TaskRunning, Task: task, Container: cont} +func taskEvent(arn string) api.TaskStateChange { + return api.TaskStateChange{TaskArn: arn, Status: api.TaskRunning} } func TestSendsEvents(t *testing.T) { // These channels will submit "successful" state changes back to the test - taskStatus := make(chan api.ContainerStateChange) + taskStatus := make(chan api.TaskStateChange) contStatus := make(chan api.ContainerStateChange) // These counters let us know how many errors have happened of each type - var taskRetriableErrors, contRetriableErrors, taskUnretriableErrors, contUnretriableErrors, taskErrors, contErrors int32 + var taskRetriableErrors, contRetriableErrors, taskUnretriableErrors, contUnretriableErrors, taskCalls, contCalls int32 resetCounters := func() { - taskErrors = 0 - contErrors = 0 + taskCalls = 0 + contCalls = 0 taskRetriableErrors = 0 contRetriableErrors = 0 taskUnretriableErrors = 0 @@ -93,8 +92,8 @@ func TestSendsEvents(t *testing.T) { retriable := utils.NewRetriableError(utils.NewRetriable(true), errors.New("test")) client := mockClient( - func(change api.ContainerStateChange) utils.RetriableError { - atomic.AddInt32(&taskErrors, 1) + func(change api.TaskStateChange) utils.RetriableError { + atomic.AddInt32(&taskCalls, 1) err := <-taskError if err == nil { taskStatus <- change @@ -109,7 +108,7 @@ func TestSendsEvents(t *testing.T) { return err }, func(change api.ContainerStateChange) utils.RetriableError { - atomic.AddInt32(&contErrors, 1) + atomic.AddInt32(&contCalls, 1) err := <-contError if err == nil { contStatus <- change @@ -124,18 +123,19 @@ func TestSendsEvents(t *testing.T) { }, ) - // Trivial: one task/container, no errors + // Trivial: one container, no errors - AddTaskEvent(contEvent("1"), client) + AddContainerEvent(contEvent("1"), client) go func() { contError <- nil }() sent := <-contStatus - if sent.TaskArn != "1" { + if sent.TaskArn != "1" || sent.Status != api.ContainerRunning { t.Error("Sent event did not match added event") } + AddContainerEvent(contEvent("2"), client) AddTaskEvent(taskEvent("2"), client) go func() { contError <- nil @@ -153,14 +153,22 @@ func TestSendsEvents(t *testing.T) { } } - sent = <-taskStatus - if sent.TaskArn != "2" { + tsent := <-taskStatus + if tsent.TaskArn != "2" { t.Error("Wrong task submitted") } + select { + case <-contStatus: + t.Error("event should have been replaced") + case <-taskStatus: + t.Error("There should be no pending taskStatus events") + default: + } + // Now a little more complicated; 1 event with retries resetCounters() - AddTaskEvent(contEvent("3"), client) + AddContainerEvent(contEvent("3"), client) go func() { contError <- retriable contError <- nil @@ -174,20 +182,30 @@ func TestSendsEvents(t *testing.T) { if sent.TaskArn != "3" { t.Error("Wrong task submitted") } - if contRetriableErrors != 1 && contErrors != 2 { + if contRetriableErrors != 1 && contCalls != 2 { t.Error("Didn't get the expected number of errors") } + select { + case <-contStatus: + t.Error("event should have been replaced") + case <-taskStatus: + t.Error("There should be no pending taskStatus events") + default: + } + resetCounters() // Test concurrency; ensure it doesn't attempt to send more than - // CONCURRENT_EVENT_CALLS at once + // concurrentEventCalls at once // Put on N+1 events - for i := 0; i < CONCURRENT_EVENT_CALLS+1; i++ { - AddTaskEvent(contEvent("concurrent_"+strconv.Itoa(i)), client) + for i := 0; i < concurrentEventCalls+1; i++ { + AddContainerEvent(contEvent("concurrent_"+strconv.Itoa(i)), client) } // N events should be waiting for potential errors; verify this is so time.Sleep(5 * time.Millisecond) - if contErrors != CONCURRENT_EVENT_CALLS { + if contCalls != concurrentEventCalls { + fmt.Println(contCalls) + fmt.Println(concurrentEventCalls) t.Error("Too many event calls got through concurrently") } // Let one through @@ -197,26 +215,26 @@ func TestSendsEvents(t *testing.T) { <-contStatus time.Sleep(5 * time.Millisecond) - if contErrors != CONCURRENT_EVENT_CALLS+1 { + if contCalls != concurrentEventCalls+1 { t.Error("Another concurrent call didn't start when expected") } // let through the rest - for i := 0; i < CONCURRENT_EVENT_CALLS; i++ { + for i := 0; i < concurrentEventCalls; i++ { go func() { contError <- nil }() <-contStatus } time.Sleep(5 * time.Millisecond) - if contErrors != CONCURRENT_EVENT_CALLS+1 { + if contCalls != concurrentEventCalls+1 { t.Error("Somehow extra concurrenct calls appeared from nowhere") } // Test container event replacement doesn't happen - AddTaskEvent(contEvent("notreplaced1"), client) + AddContainerEvent(contEvent("notreplaced1"), client) sortaRedundant := contEvent("notreplaced1") sortaRedundant.Status = api.ContainerStopped - AddTaskEvent(sortaRedundant, client) + AddContainerEvent(sortaRedundant, client) go func() { contError <- nil contError <- retriable @@ -248,11 +266,14 @@ func TestSendsEvents(t *testing.T) { } // Test task event replacement doesn't happen + AddContainerEvent(contEvent("notreplaced2"), client) AddTaskEvent(taskEvent("notreplaced2"), client) - sortaRedundant = taskEvent("notreplaced2") - sortaRedundant.Status = api.ContainerStopped - sortaRedundant.TaskStatus = api.TaskStopped - AddTaskEvent(sortaRedundant, client) + sortaRedundantc := contEvent("notreplaced2") + sortaRedundantc.Status = api.ContainerStopped + sortaRedundantt := taskEvent("notreplaced2") + sortaRedundantt.Status = api.TaskStopped + //AddContainerEvent(sortaRedundantc, client) + AddTaskEvent(sortaRedundantt, client) go func() { taskError <- nil @@ -268,29 +289,33 @@ func TestSendsEvents(t *testing.T) { if sent.TaskArn != "notreplaced2" { t.Error("Lost a task or task out of order") } - sent = <-taskStatus - if sent.TaskArn != "notreplaced2" { + tsent = <-taskStatus + if tsent.TaskArn != "notreplaced2" { t.Error("Lost a task or task out of order") } - if sent.TaskStatus != api.TaskRunning { + if tsent.Status != api.TaskRunning { t.Error("Wrong status") } - sent = <-contStatus - if sent.TaskArn != "notreplaced2" { - t.Error("Lost a task or task out of order") - } - sent = <-taskStatus - if sent.TaskArn != "notreplaced2" { + //sent = <-contStatus + //if sent.TaskArn != "notreplaced2" { + // t.Error("Lost a task or task out of order") + //} + tsent = <-taskStatus + if tsent.TaskArn != "notreplaced2" { t.Error("Lost a task or task out of order") } - if sent.TaskStatus != api.TaskStopped { + if tsent.Status != api.TaskStopped { t.Error("Wrong status") } // Verify that a task doesn't get sent if we already have 'sent' it task := taskEvent("alreadySent") - task.Task.SentStatus = api.TaskRunning - task.Container.SentStatus = api.ContainerRunning + taskRunning := api.TaskRunning + task.SentStatus = &taskRunning + cont := contEvent("alreadySent") + containerRunning := api.ContainerRunning + cont.SentStatus = &containerRunning + AddContainerEvent(cont, client) AddTaskEvent(task, client) time.Sleep(5 * time.Millisecond) select { @@ -306,23 +331,26 @@ func TestSendsEvents(t *testing.T) { } task = taskEvent("containerSent") - task.Task.SentStatus = api.TaskStatusNone - task.Container.SentStatus = api.ContainerRunning + taskNone := api.TaskStatusNone + task.SentStatus = &taskNone + cont = contEvent("containerSent") + cont.SentStatus = &containerRunning + AddContainerEvent(cont, client) AddTaskEvent(task, client) // Expect to send a task status but not a container status go func() { taskError <- nil }() - sent = <-taskStatus + tsent = <-taskStatus time.Sleep(5 * time.Millisecond) - if sent.TaskArn != "containerSent" { + if tsent.TaskArn != "containerSent" { t.Error("Wrong arn") } - if sent.TaskStatus != api.TaskRunning { + if tsent.Status != api.TaskRunning { t.Error("Wrong status") } - if task.Task.SentStatus != api.TaskRunning { - t.Error("Status not updated: ", task.Task.SentStatus.String()) + if *task.SentStatus != api.TaskRunning { + t.Error("Status not updated: ") } select { @@ -339,22 +367,15 @@ func TestSendsEvents(t *testing.T) { } func TestShouldBeSent(t *testing.T) { - sendableEvent := newSendableEvent(api.ContainerStateChange{ - Status: api.ContainerStopped, - TaskStatus: api.TaskStatusNone, - Container: &api.Container{}, + sendableEvent := newSendableContainerEvent(api.ContainerStateChange{ + Status: api.ContainerStopped, }) if sendableEvent.taskShouldBeSent() { - t.Error("TasStatusNone should not be sent") + t.Error("Container event should not be sent as a task") } if !sendableEvent.containerShouldBeSent() { t.Error("Container should be sent if it's the first try") } - - sendableEvent = newSendableEvent(api.ContainerStateChange{ - Status: api.ContainerStopped, - TaskStatus: api.TaskStatusNone, - }) } diff --git a/agent/eventhandler/task_handler.go b/agent/eventhandler/task_handler.go index 30831eab890..56591624d36 100644 --- a/agent/eventhandler/task_handler.go +++ b/agent/eventhandler/task_handler.go @@ -27,9 +27,17 @@ func init() { handler = newTaskHandler() } +func AddTaskEvent(change api.TaskStateChange, client api.ECSClient) { + addEvent(newSendableTaskEvent(change), client) +} + +func AddContainerEvent(change api.ContainerStateChange, client api.ECSClient) { + addEvent(newSendableContainerEvent(change), client) +} + // Prepares a given event to be sent by adding it to the handler's appropriate // eventList -func AddTaskEvent(change api.ContainerStateChange, client api.ECSClient) { +func addEvent(change *sendableEvent, client api.ECSClient) { var taskList *eventList var preexisting bool log.Info("Adding event", "change", change) @@ -39,11 +47,11 @@ func AddTaskEvent(change api.ContainerStateChange, client api.ECSClient) { handler.Lock() defer handler.Unlock() - taskList, preexisting = handler.taskMap[change.TaskArn] + taskList, preexisting = handler.taskMap[change.taskArn()] if !preexisting { log.Debug("New event", "change", change) taskList = &eventList{List: list.New(), sending: false} - handler.taskMap[change.TaskArn] = taskList + handler.taskMap[change.taskArn()] = taskList } }() @@ -51,7 +59,7 @@ func AddTaskEvent(change api.ContainerStateChange, client api.ECSClient) { defer taskList.Unlock() // Update taskEvent - taskList.PushBack(newSendableEvent(change)) + taskList.PushBack(change) if !taskList.sending { taskList.sending = true @@ -84,8 +92,7 @@ func SubmitTaskEvents(events *eventList, client api.ECSClient) { defer events.Unlock() log.Debug("Aquired lock!") - var retErr error - retErr = nil + var err utils.RetriableError if events.Len() == 0 { log.Debug("No events left; not retrying more") @@ -99,49 +106,45 @@ func SubmitTaskEvents(events *eventList, client api.ECSClient) { event := eventToSubmit.Value.(*sendableEvent) llog := log.New("event", event) - var contErr, taskErr utils.RetriableError if event.containerShouldBeSent() { - llog.Info("Sending container change", "change", event.ContainerStateChange) - contErr = client.SubmitContainerStateChange(event.ContainerStateChange) - if contErr == nil || !contErr.Retry() { + llog.Info("Sending container change", "change", event.containerChange) + err = client.SubmitContainerStateChange(event.containerChange) + if err == nil || !err.Retry() { // submitted or can't be retried; ensure we don't retry it event.containerSent = true - event.Container.SentStatus = event.Status + if event.containerChange.SentStatus != nil { + *event.containerChange.SentStatus = event.containerChange.Status + } statesaver.Save() - if contErr != nil { - llog.Error("Unretriable error submitting container state change", "err", contErr) + if err != nil { + llog.Error("Unretriable error submitting container state change", "err", err) } else { llog.Debug("Submitted container") } - } - } - if event.taskShouldBeSent() { - llog.Info("Sending task change", "change", event.ContainerStateChange.TaskStatus) - taskErr = client.SubmitTaskStateChange(event.ContainerStateChange) - if taskErr == nil || !taskErr.Retry() { + events.Remove(eventToSubmit) + } // else, leave event on and retry it next loop through + } else if event.taskShouldBeSent() { + llog.Info("Sending task change", "change", event.taskChange) + err = client.SubmitTaskStateChange(event.taskChange) + if err == nil || !err.Retry() { // submitted or can't be retried; ensure we don't retry it event.taskSent = true - event.Task.SentStatus = event.TaskStatus + if event.taskChange.SentStatus != nil { + *event.taskChange.SentStatus = event.taskChange.Status + } statesaver.Save() - if taskErr != nil { - llog.Error("Unretriable error submitting container state change", "err", contErr) + if err != nil { + llog.Error("Unretriable error submitting container state change", "err", err) } else { llog.Debug("Submitted container") + backoff.Reset() } + events.Remove(eventToSubmit) } - } - - if contErr == nil && taskErr == nil { - llog.Debug("Successfully submitted event") - events.Remove(eventToSubmit) - // We had a success so reset our backoff - backoff.Reset() - } else if (contErr == nil || !contErr.Retry()) && (taskErr == nil || !taskErr.Retry()) { - // Error, but not retriable - llog.Debug("Unretriable error for event", "status", event.ContainerStateChange) - events.Remove(eventToSubmit) } else { - retErr = utils.NewMultiError(contErr, taskErr) + // Shouldn't be sent as either a task or container change event; must have been already sent + llog.Info("Not submitting redundant event; just removing") + events.Remove(eventToSubmit) } if events.Len() == 0 { @@ -151,7 +154,7 @@ func SubmitTaskEvents(events *eventList, client api.ECSClient) { return nil } - return retErr + return err }) } } diff --git a/agent/eventhandler/task_handler_types.go b/agent/eventhandler/task_handler_types.go index 1897755199c..62a06875524 100644 --- a/agent/eventhandler/task_handler_types.go +++ b/agent/eventhandler/task_handler_types.go @@ -22,37 +22,64 @@ import ( ) // Maximum number of tasks that may be handled at once by the taskHandler -const CONCURRENT_EVENT_CALLS = 3 +const concurrentEventCalls = 3 // a state change that may have a container and, optionally, a task event to // send type sendableEvent struct { - containerSent bool - taskSent bool + // Either is a contaienr event or a task event + isContainerEvent bool - api.ContainerStateChange + containerSent bool + containerChange api.ContainerStateChange + + taskSent bool + taskChange api.TaskStateChange +} + +func newSendableContainerEvent(event api.ContainerStateChange) *sendableEvent { + return &sendableEvent{ + isContainerEvent: true, + containerSent: false, + containerChange: event, + } } -func newSendableEvent(event api.ContainerStateChange) *sendableEvent { +func newSendableTaskEvent(event api.TaskStateChange) *sendableEvent { return &sendableEvent{ - containerSent: false, - taskSent: false, - ContainerStateChange: event, + isContainerEvent: false, + taskSent: false, + taskChange: event, + } +} + +func (event *sendableEvent) taskArn() string { + if event.isContainerEvent { + return event.containerChange.TaskArn } + return event.taskChange.TaskArn } func (event *sendableEvent) taskShouldBeSent() bool { - if event.TaskStatus == api.TaskStatusNone { - return false // container only event + if event.isContainerEvent { + return false } - if event.taskSent || event.Task.SentStatus >= event.TaskStatus { + tevent := event.taskChange + if tevent.Status == api.TaskStatusNone { + return false // defensive programming :) + } + if event.taskSent || (tevent.SentStatus != nil && *tevent.SentStatus >= tevent.Status) { return false // redundant event } return true } func (event *sendableEvent) containerShouldBeSent() bool { - if event.containerSent || event.Container.SentStatus >= event.Status { + if !event.isContainerEvent { + return false + } + cevent := event.containerChange + if event.containerSent || (cevent.SentStatus != nil && *cevent.SentStatus >= cevent.Status) { return false } return true @@ -73,7 +100,7 @@ type taskHandler struct { func newTaskHandler() taskHandler { taskMap := make(map[string]*eventList) - submitSemaphore := utils.NewSemaphore(CONCURRENT_EVENT_CALLS) + submitSemaphore := utils.NewSemaphore(concurrentEventCalls) return taskHandler{ taskMap: taskMap, diff --git a/agent/handlers/v1_handlers_test.go b/agent/handlers/v1_handlers_test.go index 214f9140f13..b2dbb6f9579 100644 --- a/agent/handlers/v1_handlers_test.go +++ b/agent/handlers/v1_handlers_test.go @@ -77,7 +77,7 @@ func TestServeHttp(t *testing.T) { } // Populate Tasks and Container map in the engine. dockerTaskEngine, _ := taskEngine.(*engine.DockerTaskEngine) - dockerTaskEngine.State().AddOrUpdateTask(&testTask) + dockerTaskEngine.State().AddTask(&testTask) dockerTaskEngine.State().AddContainer(&api.DockerContainer{DockerId: "docker1", DockerName: "someName", Container: containers[0]}, &testTask) go ServeHttp(utils.Strptr(TestContainerInstanceArn), taskEngine, &config.Config{Cluster: TestClusterArn}) @@ -156,12 +156,12 @@ func backendMappingTestHelper(containers []*api.Container, testTask api.Task, de taskEngine := engine.NewTaskEngine(&config.Config{}) // Populate Tasks and Container map in the engine. dockerTaskEngine, _ := taskEngine.(*engine.DockerTaskEngine) - dockerTaskEngine.State().AddOrUpdateTask(&testTask) + dockerTaskEngine.State().AddTask(&testTask) dockerTaskEngine.State().AddContainer(&api.DockerContainer{DockerId: "docker1", DockerName: "someName", Container: containers[0]}, &testTask) taskHandler := TasksV1RequestHandlerMaker(taskEngine) server := httptest.NewServer(http.HandlerFunc(taskHandler)) defer server.Close() - resp, err := http.Get(server.URL+"/v1/tasks") + resp, err := http.Get(server.URL + "/v1/tasks") if err != nil { t.Fatalf("Get: %v", err) } diff --git a/agent/statemanager/state_manager_test.go b/agent/statemanager/state_manager_test.go index 193bd7d1120..be528d2841a 100644 --- a/agent/statemanager/state_manager_test.go +++ b/agent/statemanager/state_manager_test.go @@ -57,7 +57,7 @@ func TestStateManager(t *testing.T) { containerInstanceArn = "containerInstanceArn" testTask := &api.Task{Arn: "test-arn"} - taskEngine.AddTask(testTask) + taskEngine.(*engine.DockerTaskEngine).State().AddTask(testTask) err = manager.Save() if err != nil { diff --git a/agent/utils/ttime/test_time.go b/agent/utils/ttime/test_time.go index 75885843631..066b93e8961 100644 --- a/agent/utils/ttime/test_time.go +++ b/agent/utils/ttime/test_time.go @@ -41,6 +41,17 @@ func (t *TestTime) Now() time.Time { return time.Now().Add(t.warped) } +// After returns a channel which is written to after the given duration, taking +// into account time-warping +func (t *TestTime) After(d time.Duration) <-chan time.Time { + done := make(chan time.Time) + go func() { + t.Sleep(d) + done <- t.Now() + }() + return done +} + // Sleep sleeps the given duration in mock-time; that is to say that Warps will // reduce the amount of time slept and LudicrousSpeed will cause instant // success. diff --git a/agent/utils/ttime/ttime.go b/agent/utils/ttime/ttime.go index bdb55501a9b..1424e92ca0d 100644 --- a/agent/utils/ttime/ttime.go +++ b/agent/utils/ttime/ttime.go @@ -7,6 +7,7 @@ import "time" type Time interface { Now() time.Time Sleep(d time.Duration) + After(d time.Duration) <-chan time.Time } // DefaultTime is a Time that behaves normally @@ -24,6 +25,11 @@ func (*DefaultTime) Sleep(d time.Duration) { time.Sleep(d) } +// After sleeps for the given duration and then writes to to the returned channel +func (*DefaultTime) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + // SetTime configures what 'Time' implementation to use for each of the // package-level methods. func SetTime(t Time) { @@ -44,3 +50,8 @@ func Sleep(d time.Duration) { func Since(t time.Time) time.Duration { return _time.Now().Sub(t) } + +// After calls the implementations After method +func After(t time.Duration) <-chan time.Time { + return _time.After(t) +} diff --git a/agent/utils/ttime/ttime_test.go b/agent/utils/ttime/ttime_test.go index 24405a1d233..9ffdc812118 100644 --- a/agent/utils/ttime/ttime_test.go +++ b/agent/utils/ttime/ttime_test.go @@ -53,3 +53,22 @@ func TestSleepWarp(t *testing.T) { t.Error("Time should have been warped") } } + +func TestAfter(t *testing.T) { + testTime := NewTestTime() + SetTime(testTime) + + done := make(chan bool) + + realnow := time.Now() + go func() { + <-After(1 * time.Second) + <-After(1 * time.Second) + done <- true + }() + testTime.Warp(15 * time.Second) + <-done + if time.Since(realnow) > 1*time.Second { + t.Error("Time should have been warped") + } +} diff --git a/misc/volumes-test/Dockerfile b/misc/volumes-test/Dockerfile index afafa69b7bc..170a2e84a2c 100644 --- a/misc/volumes-test/Dockerfile +++ b/misc/volumes-test/Dockerfile @@ -1,4 +1,3 @@ - # Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You may