Skip to content

Commit 653a586

Browse files
jmcarpashb
authored andcommitted
[AIRFLOW-3074] Add relevant ECS options to ECS operator. (#3908)
The ECS operator currently supports only a subset of available options for running ECS tasks. This patch adds all ECS options that could be relevant to airflow; options that wouldn't make sense here, like `count`, were skipped.
1 parent 428a91d commit 653a586

File tree

2 files changed

+73
-11
lines changed

2 files changed

+73
-11
lines changed

airflow/contrib/operators/ecs_operator.py

+31-8
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ class ECSOperator(BaseOperator):
4646
:type region_name: str
4747
:param launch_type: the launch type on which to run your task ('EC2' or 'FARGATE')
4848
:type launch_type: str
49+
:param group: the name of the task group associated with the task
50+
:type group: str
51+
:param placement_constraints: an array of placement constraint objects to use for
52+
the task
53+
:type placement_constraints: list
54+
:param platform_version: the platform version on which your task is running
55+
:type platform_version: str
56+
:param network_configuration: the network configuration for the task
57+
:type network_configuration: dict
4958
"""
5059

5160
ui_color = '#f0ede4'
@@ -55,7 +64,9 @@ class ECSOperator(BaseOperator):
5564

5665
@apply_defaults
5766
def __init__(self, task_definition, cluster, overrides,
58-
aws_conn_id=None, region_name=None, launch_type='EC2', **kwargs):
67+
aws_conn_id=None, region_name=None, launch_type='EC2',
68+
group=None, placement_constraints=None, platform_version='LATEST',
69+
network_configuration=None, **kwargs):
5970
super(ECSOperator, self).__init__(**kwargs)
6071

6172
self.aws_conn_id = aws_conn_id
@@ -64,6 +75,10 @@ def __init__(self, task_definition, cluster, overrides,
6475
self.cluster = cluster
6576
self.overrides = overrides
6677
self.launch_type = launch_type
78+
self.group = group
79+
self.placement_constraints = placement_constraints
80+
self.platform_version = platform_version
81+
self.network_configuration = network_configuration
6782

6883
self.hook = self.get_hook()
6984

@@ -79,13 +94,21 @@ def execute(self, context):
7994
region_name=self.region_name
8095
)
8196

82-
response = self.client.run_task(
83-
cluster=self.cluster,
84-
taskDefinition=self.task_definition,
85-
overrides=self.overrides,
86-
startedBy=self.owner,
87-
launchType=self.launch_type
88-
)
97+
run_opts = {
98+
'cluster': self.cluster,
99+
'taskDefinition': self.task_definition,
100+
'overrides': self.overrides,
101+
'startedBy': self.owner,
102+
'launchType': self.launch_type,
103+
'platformVersion': self.platform_version,
104+
}
105+
if self.group is not None:
106+
run_opts['group'] = self.group
107+
if self.placement_constraints is not None:
108+
run_opts['placementConstraints'] = self.placement_constraints
109+
if self.network_configuration is not None:
110+
run_opts['networkConfiguration'] = self.network_configuration
111+
response = self.client.run_task(**run_opts)
89112

90113
failures = response['failures']
91114
if len(failures) > 0:

tests/contrib/operators/test_ecs_operator.py

+42-3
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,20 @@ def setUp(self, aws_hook_mock):
6969
cluster='c',
7070
overrides={},
7171
aws_conn_id=None,
72-
region_name='eu-west-1')
72+
region_name='eu-west-1',
73+
group='group',
74+
placement_constraints=[
75+
{
76+
'expression': 'attribute:ecs.instance-type =~ t2.*',
77+
'type': 'memberOf'
78+
}
79+
],
80+
network_configuration={
81+
'awsvpcConfiguration': {
82+
'securityGroups': ['sg-123abc']
83+
}
84+
}
85+
)
7386

7487
def test_init(self):
7588
self.assertEqual(self.ecs.region_name, 'eu-west-1')
@@ -99,7 +112,20 @@ def test_execute_without_failures(self, check_mock, wait_mock):
99112
launchType='EC2',
100113
overrides={},
101114
startedBy=mock.ANY, # Can by 'airflow' or 'Airflow'
102-
taskDefinition='t'
115+
taskDefinition='t',
116+
group='group',
117+
placementConstraints=[
118+
{
119+
'expression': 'attribute:ecs.instance-type =~ t2.*',
120+
'type': 'memberOf'
121+
}
122+
],
123+
platformVersion='LATEST',
124+
networkConfiguration={
125+
'awsvpcConfiguration': {
126+
'securityGroups': ['sg-123abc']
127+
}
128+
}
103129
)
104130

105131
wait_mock.assert_called_once_with()
@@ -123,7 +149,20 @@ def test_execute_with_failures(self):
123149
launchType='EC2',
124150
overrides={},
125151
startedBy=mock.ANY, # Can by 'airflow' or 'Airflow'
126-
taskDefinition='t'
152+
taskDefinition='t',
153+
group='group',
154+
placementConstraints=[
155+
{
156+
'expression': 'attribute:ecs.instance-type =~ t2.*',
157+
'type': 'memberOf'
158+
}
159+
],
160+
platformVersion='LATEST',
161+
networkConfiguration={
162+
'awsvpcConfiguration': {
163+
'securityGroups': ['sg-123abc']
164+
}
165+
}
127166
)
128167

129168
def test_wait_end_tasks(self):

0 commit comments

Comments
 (0)