Skip to content

Commit 739fbff

Browse files
committed
[AIRFLOW-3044] Dataflow operators accept templated job_name param (apache#3887)
* Default value of new job_name param is templated task_id, to match the existing behavior as much as possible. * Change expected value in test_mlengine_operator_utils.py to match default for new job_name param.
1 parent e60b208 commit 739fbff

File tree

5 files changed

+138
-60
lines changed

5 files changed

+138
-60
lines changed

airflow/contrib/hooks/gcp_dataflow_hook.py

+18-24
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,9 @@ def get_conn(self):
190190
return build(
191191
'dataflow', 'v1b3', http=http_authorized, cache_discovery=False)
192192

193-
def _start_dataflow(self, task_id, variables, name,
194-
command_prefix, label_formatter):
193+
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
195194
variables = self._set_variables(variables)
196-
cmd = command_prefix + self._build_cmd(task_id, variables,
197-
label_formatter)
195+
cmd = command_prefix + self._build_cmd(variables, label_formatter)
198196
job_id = _Dataflow(cmd).wait_for_done()
199197
_DataflowJob(self.get_conn(), variables['project'], name,
200198
variables['region'],
@@ -208,58 +206,54 @@ def _set_variables(variables):
208206
variables['region'] = DEFAULT_DATAFLOW_LOCATION
209207
return variables
210208

211-
def start_java_dataflow(self, task_id, variables, dataflow, job_class=None,
209+
def start_java_dataflow(self, job_name, variables, dataflow, job_class=None,
212210
append_job_name=True):
213-
name = self._build_dataflow_job_name(task_id, append_job_name)
211+
name = self._build_dataflow_job_name(job_name, append_job_name)
214212
variables['jobName'] = name
215213

216214
def label_formatter(labels_dict):
217215
return ['--labels={}'.format(
218216
json.dumps(labels_dict).replace(' ', ''))]
219217
command_prefix = (["java", "-cp", dataflow, job_class] if job_class
220218
else ["java", "-jar", dataflow])
221-
self._start_dataflow(task_id, variables, name,
222-
command_prefix, label_formatter)
219+
self._start_dataflow(variables, name, command_prefix, label_formatter)
223220

224-
def start_template_dataflow(self, task_id, variables, parameters, dataflow_template,
221+
def start_template_dataflow(self, job_name, variables, parameters, dataflow_template,
225222
append_job_name=True):
226-
name = self._build_dataflow_job_name(task_id, append_job_name)
223+
name = self._build_dataflow_job_name(job_name, append_job_name)
227224
self._start_template_dataflow(
228225
name, variables, parameters, dataflow_template)
229226

230-
def start_python_dataflow(self, task_id, variables, dataflow, py_options,
227+
def start_python_dataflow(self, job_name, variables, dataflow, py_options,
231228
append_job_name=True):
232-
name = self._build_dataflow_job_name(task_id, append_job_name)
229+
name = self._build_dataflow_job_name(job_name, append_job_name)
233230
variables['job_name'] = name
234231

235232
def label_formatter(labels_dict):
236233
return ['--labels={}={}'.format(key, value)
237234
for key, value in labels_dict.items()]
238-
# TODO: Change python2 to python when Beam supports both python 2 and 3
239-
# Remember to change the test case too
240-
self._start_dataflow(task_id, variables, name,
241-
["python2"] + py_options + [dataflow],
235+
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
242236
label_formatter)
243237

244238
@staticmethod
245-
def _build_dataflow_job_name(task_id, append_job_name=True):
246-
task_id = str(task_id).replace('_', '-')
239+
def _build_dataflow_job_name(job_name, append_job_name=True):
240+
base_job_name = str(job_name).replace('_', '-')
247241

248-
if not re.match(r"^[a-z]([-a-z0-9]*[a-z0-9])?$", task_id):
242+
if not re.match(r"^[a-z]([-a-z0-9]*[a-z0-9])?$", base_job_name):
249243
raise ValueError(
250244
'Invalid job_name ({}); the name must consist of'
251245
'only the characters [-a-z0-9], starting with a '
252-
'letter and ending with a letter or number '.format(task_id))
246+
'letter and ending with a letter or number '.format(base_job_name))
253247

254248
if append_job_name:
255-
job_name = task_id + "-" + str(uuid.uuid1())[:8]
249+
safe_job_name = base_job_name + "-" + str(uuid.uuid4())[:8]
256250
else:
257-
job_name = task_id
251+
safe_job_name = base_job_name
258252

259-
return job_name
253+
return safe_job_name
260254

261255
@staticmethod
262-
def _build_cmd(task_id, variables, label_formatter):
256+
def _build_cmd(variables, label_formatter):
263257
command = ["--runner=DataflowRunner"]
264258
if variables is not None:
265259
for attr, value in variables.items():

airflow/contrib/operators/dataflow_operator.py

+86-9
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,43 @@ class DataFlowJavaOperator(BaseOperator):
3333
Start a Java Cloud DataFlow batch job. The parameters of the operation
3434
will be passed to the job.
3535
36+
.. seealso::
37+
For more detail on job submission have a look at the reference:
38+
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
39+
40+
:param jar: The reference to a self executing DataFlow jar (templated).
41+
:type jar: str
42+
:param job_name: The 'jobName' to use when executing the DataFlow job
43+
(templated). This ends up being set in the pipeline options, so any entry
44+
with key ``'jobName'`` in ``options`` will be overwritten.
45+
:type job_name: str
46+
:param dataflow_default_options: Map of default job options.
47+
:type dataflow_default_options: dict
48+
:param options: Map of job specific options.
49+
:type options: dict
50+
:param gcp_conn_id: The connection ID to use connecting to Google Cloud
51+
Platform.
52+
:type gcp_conn_id: str
53+
:param delegate_to: The account to impersonate, if any.
54+
For this to work, the service account making the request must have
55+
domain-wide delegation enabled.
56+
:type delegate_to: str
57+
:param poll_sleep: The time in seconds to sleep between polling Google
58+
Cloud Platform for the dataflow job status while the job is in the
59+
JOB_STATE_RUNNING state.
60+
:type poll_sleep: int
61+
:param job_class: The name of the dataflow job class to be executued, it
62+
is often not the main class configured in the dataflow jar file.
63+
:type job_class: str
64+
65+
``jar``, ``options``, and ``job_name`` are templated so you can use variables in them.
66+
67+
Note that both
68+
``dataflow_default_options`` and ``options`` will be merged to specify pipeline
69+
execution parameter, and ``dataflow_default_options`` is expected to save
70+
high-level options, for instances, project and zone information, which
71+
apply to all dataflow operators in the DAG.
72+
3673
It's a good practice to define dataflow_* parameters in the default_args of the dag
3774
like the project, zone and staging location.
3875
@@ -68,13 +105,14 @@ class DataFlowJavaOperator(BaseOperator):
68105
69106
Both ``jar`` and ``options`` are templated so you can use variables in them.
70107
"""
71-
template_fields = ['options', 'jar']
108+
template_fields = ['options', 'jar', 'job_name']
72109
ui_color = '#0273d4'
73110

74111
@apply_defaults
75112
def __init__(
76113
self,
77114
jar,
115+
job_name='{{task.task_id}}',
78116
dataflow_default_options=None,
79117
options=None,
80118
gcp_conn_id='google_cloud_default',
@@ -125,6 +163,7 @@ def __init__(
125163
self.gcp_conn_id = gcp_conn_id
126164
self.delegate_to = delegate_to
127165
self.jar = jar
166+
self.job_name = job_name
128167
self.dataflow_default_options = dataflow_default_options
129168
self.options = options
130169
self.poll_sleep = poll_sleep
@@ -141,14 +180,35 @@ def execute(self, context):
141180
dataflow_options = copy.copy(self.dataflow_default_options)
142181
dataflow_options.update(self.options)
143182

144-
hook.start_java_dataflow(self.task_id, dataflow_options,
183+
hook.start_java_dataflow(self.job_name, dataflow_options,
145184
self.jar, self.job_class)
146185

147186

148187
class DataflowTemplateOperator(BaseOperator):
149188
"""
150189
Start a Templated Cloud DataFlow batch job. The parameters of the operation
151190
will be passed to the job.
191+
192+
:param template: The reference to the DataFlow template.
193+
:type template: str
194+
:param job_name: The 'jobName' to use when executing the DataFlow template
195+
(templated).
196+
:param dataflow_default_options: Map of default job environment options.
197+
:type dataflow_default_options: dict
198+
:param parameters: Map of job specific parameters for the template.
199+
:type parameters: dict
200+
:param gcp_conn_id: The connection ID to use connecting to Google Cloud
201+
Platform.
202+
:type gcp_conn_id: str
203+
:param delegate_to: The account to impersonate, if any.
204+
For this to work, the service account making the request must have
205+
domain-wide delegation enabled.
206+
:type delegate_to: str
207+
:param poll_sleep: The time in seconds to sleep between polling Google
208+
Cloud Platform for the dataflow job status while the job is in the
209+
JOB_STATE_RUNNING state.
210+
:type poll_sleep: int
211+
152212
It's a good practice to define dataflow_* parameters in the default_args of the dag
153213
like the project, zone and staging location.
154214
@@ -183,16 +243,27 @@ class DataflowTemplateOperator(BaseOperator):
183243
gcp_conn_id='gcp-airflow-service-account',
184244
dag=my-dag)
185245
186-
``template``, ``dataflow_default_options`` and ``parameters`` are templated so you can
187-
use variables in them.
246+
``template``, ``dataflow_default_options``, ``parameters``, and ``job_name`` are
247+
templated so you can use variables in them.
248+
249+
Note that ``dataflow_default_options`` is expected to save high-level options
250+
for project information, which apply to all dataflow operators in the DAG.
251+
252+
.. seealso::
253+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3
254+
/LaunchTemplateParameters
255+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
256+
For more detail on job template execution have a look at the reference:
257+
https://cloud.google.com/dataflow/docs/templates/executing-templates
188258
"""
189-
template_fields = ['parameters', 'dataflow_default_options', 'template']
259+
template_fields = ['parameters', 'dataflow_default_options', 'template', 'job_name']
190260
ui_color = '#0273d4'
191261

192262
@apply_defaults
193263
def __init__(
194264
self,
195265
template,
266+
job_name='{{task.task_id}}',
196267
dataflow_default_options=None,
197268
parameters=None,
198269
gcp_conn_id='google_cloud_default',
@@ -240,14 +311,15 @@ def __init__(
240311
self.dataflow_default_options = dataflow_default_options
241312
self.poll_sleep = poll_sleep
242313
self.template = template
314+
self.job_name = job_name
243315
self.parameters = parameters
244316

245317
def execute(self, context):
246318
hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
247319
delegate_to=self.delegate_to,
248320
poll_sleep=self.poll_sleep)
249321

250-
hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
322+
hook.start_template_dataflow(self.job_name, self.dataflow_default_options,
251323
self.parameters, self.template)
252324

253325

@@ -266,6 +338,10 @@ class DataFlowPythonOperator(BaseOperator):
266338
:param py_file: Reference to the python dataflow pipleline file.py, e.g.,
267339
/some/local/file/path/to/your/python/pipeline/file.
268340
:type py_file: string
341+
:param job_name: The 'job_name' to use when executing the DataFlow job
342+
(templated). This ends up being set in the pipeline options, so any entry
343+
with key ``'jobName'`` or ``'job_name'`` in ``options`` will be overwritten.
344+
:type job_name: str
269345
:param py_options: Additional python options.
270346
:type pyt_options: list of strings, e.g., ["-m", "-v"].
271347
:param dataflow_default_options: Map of default job options.
@@ -284,13 +360,13 @@ class DataFlowPythonOperator(BaseOperator):
284360
JOB_STATE_RUNNING state.
285361
:type poll_sleep: int
286362
"""
287-
288-
template_fields = ['options', 'dataflow_default_options']
363+
template_fields = ['options', 'dataflow_default_options', 'job_name']
289364

290365
@apply_defaults
291366
def __init__(
292367
self,
293368
py_file,
369+
job_name='{{task.task_id}}',
294370
py_options=None,
295371
dataflow_default_options=None,
296372
options=None,
@@ -303,6 +379,7 @@ def __init__(
303379
super(DataFlowPythonOperator, self).__init__(*args, **kwargs)
304380

305381
self.py_file = py_file
382+
self.job_name = job_name
306383
self.py_options = py_options or []
307384
self.dataflow_default_options = dataflow_default_options or {}
308385
self.options = options or {}
@@ -328,7 +405,7 @@ def execute(self, context):
328405
formatted_options = {camel_to_snake(key): dataflow_options[key]
329406
for key in dataflow_options}
330407
hook.start_python_dataflow(
331-
self.task_id, formatted_options,
408+
self.job_name, formatted_options,
332409
self.py_file, self.py_options)
333410

334411

0 commit comments

Comments
 (0)