Skip to content

Commit d176fa5

Browse files
danmactoughashb
authored andcommitted
[AIRFLOW-3046] Report fail from ECS Operator when host terminates (#4039)
Add check for host termination to ECS Operator If an ECS task fails to complete because the host it's running on is terminated, we need to raise an exception so it can be retried.
1 parent 9681954 commit d176fa5

File tree

2 files changed

+56
-6
lines changed

2 files changed

+56
-6
lines changed

airflow/contrib/operators/ecs_operator.py

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919
import sys
20+
import re
2021

2122
from airflow.exceptions import AirflowException
2223
from airflow.models import BaseOperator
@@ -139,6 +140,15 @@ def _check_success_task(self):
139140
raise AirflowException(response)
140141

141142
for task in response['tasks']:
143+
# This is a `stoppedReason` that indicates a task has not
144+
# successfully finished, but there is no other indication of failure
145+
# in the response.
146+
# See, https://docs.aws.amazon.com/AmazonECS/latest/developerguide/stopped-task-errors.html # noqa E501
147+
if re.match(r'Host EC2 \(instance .+?\) (stopped|terminated)\.',
148+
task.get('stoppedReason', '')):
149+
raise AirflowException(
150+
'The task was stopped because the host instance terminated: {}'.
151+
format(task.get('stoppedReason', '')))
142152
containers = task['containers']
143153
for container in containers:
144154
if container.get('lastStatus') == 'STOPPED' and \

tests/contrib/operators/test_ecs_operator.py

+46-6
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,10 @@ def test_wait_end_tasks(self):
172172

173173
self.ecs._wait_for_task_ended()
174174
client_mock.get_waiter.assert_called_once_with('tasks_stopped')
175-
client_mock.get_waiter.return_value.wait.assert_called_once_with(cluster='c', tasks=['arn'])
176-
self.assertEquals(sys.maxsize, client_mock.get_waiter.return_value.config.max_attempts)
175+
client_mock.get_waiter.return_value.wait.assert_called_once_with(
176+
cluster='c', tasks=['arn'])
177+
self.assertEquals(
178+
sys.maxsize, client_mock.get_waiter.return_value.config.max_attempts)
177179

178180
def test_check_success_tasks_raises(self):
179181
client_mock = mock.Mock()
@@ -197,7 +199,8 @@ def test_check_success_tasks_raises(self):
197199
self.assertIn("'name': 'foo'", str(e.exception))
198200
self.assertIn("'lastStatus': 'STOPPED'", str(e.exception))
199201
self.assertIn("'exitCode': 1", str(e.exception))
200-
client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
202+
client_mock.describe_tasks.assert_called_once_with(
203+
cluster='c', tasks=['arn'])
201204

202205
def test_check_success_tasks_raises_pending(self):
203206
client_mock = mock.Mock()
@@ -217,7 +220,8 @@ def test_check_success_tasks_raises_pending(self):
217220
self.assertIn("This task is still pending ", str(e.exception))
218221
self.assertIn("'name': 'container-name'", str(e.exception))
219222
self.assertIn("'lastStatus': 'PENDING'", str(e.exception))
220-
client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
223+
client_mock.describe_tasks.assert_called_once_with(
224+
cluster='c', tasks=['arn'])
221225

222226
def test_check_success_tasks_raises_multiple(self):
223227
client_mock = mock.Mock()
@@ -236,7 +240,42 @@ def test_check_success_tasks_raises_multiple(self):
236240
}]
237241
}
238242
self.ecs._check_success_task()
239-
client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
243+
client_mock.describe_tasks.assert_called_once_with(
244+
cluster='c', tasks=['arn'])
245+
246+
def test_host_terminated_raises(self):
247+
client_mock = mock.Mock()
248+
self.ecs.client = client_mock
249+
self.ecs.arn = 'arn'
250+
client_mock.describe_tasks.return_value = {
251+
'tasks': [{
252+
'stoppedReason': 'Host EC2 (instance i-1234567890abcdef) terminated.',
253+
"containers": [
254+
{
255+
"containerArn": "arn:aws:ecs:us-east-1:012345678910:container/e1ed7aac-d9b2-4315-8726-d2432bf11868", # noqa: E501
256+
"lastStatus": "RUNNING",
257+
"name": "wordpress",
258+
"taskArn": "arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55" # noqa: E501
259+
}
260+
],
261+
"desiredStatus": "STOPPED",
262+
"lastStatus": "STOPPED",
263+
"taskArn": "arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55", # noqa: E501
264+
"taskDefinitionArn": "arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11" # noqa: E501
265+
266+
}]
267+
}
268+
269+
with self.assertRaises(AirflowException) as e:
270+
self.ecs._check_success_task()
271+
272+
self.assertIn(
273+
"The task was stopped because the host instance terminated:",
274+
str(e.exception))
275+
self.assertIn("Host EC2 (", str(e.exception))
276+
self.assertIn(") terminated", str(e.exception))
277+
client_mock.describe_tasks.assert_called_once_with(
278+
cluster='c', tasks=['arn'])
240279

241280
def test_check_success_task_not_raises(self):
242281
client_mock = mock.Mock()
@@ -252,7 +291,8 @@ def test_check_success_task_not_raises(self):
252291
}]
253292
}
254293
self.ecs._check_success_task()
255-
client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
294+
client_mock.describe_tasks.assert_called_once_with(
295+
cluster='c', tasks=['arn'])
256296

257297

258298
if __name__ == '__main__':

0 commit comments

Comments
 (0)