@@ -68,6 +68,9 @@ class DataprocClusterCreateOperator(BaseOperator):
68
68
:type metadata: dict
69
69
:param image_version: the version of software inside the Dataproc cluster
70
70
:type image_version: string
71
+ :param custom_image: custom Dataproc image for more info see
72
+ https://cloud.google.com/dataproc/docs/guides/dataproc-images
73
+ :type: custom_image: string
71
74
:param properties: dict of properties to set on
72
75
config files (e.g. spark-defaults.conf), see
73
76
https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
@@ -148,6 +151,7 @@ def __init__(self,
148
151
init_actions_uris = None ,
149
152
init_action_timeout = "10m" ,
150
153
metadata = None ,
154
+ custom_image = None ,
151
155
image_version = None ,
152
156
properties = None ,
153
157
master_machine_type = 'n1-standard-4' ,
@@ -180,6 +184,7 @@ def __init__(self,
180
184
self .init_actions_uris = init_actions_uris
181
185
self .init_action_timeout = init_action_timeout
182
186
self .metadata = metadata
187
+ self .custom_image = custom_image
183
188
self .image_version = image_version
184
189
self .properties = properties
185
190
self .master_machine_type = master_machine_type
@@ -201,6 +206,9 @@ def __init__(self,
201
206
self .auto_delete_time = auto_delete_time
202
207
self .auto_delete_ttl = auto_delete_ttl
203
208
209
+ assert not (self .custom_image and self .image_version ), \
210
+ "custom_image and image_version can't be both set"
211
+
204
212
def _get_cluster_list_for_project (self , service ):
205
213
result = service .projects ().regions ().clusters ().list (
206
214
projectId = self .project_id ,
@@ -338,6 +346,12 @@ def _build_cluster_data(self):
338
346
cluster_data ['config' ]['gceClusterConfig' ]['tags' ] = self .tags
339
347
if self .image_version :
340
348
cluster_data ['config' ]['softwareConfig' ]['imageVersion' ] = self .image_version
349
+ elif self .custom_image :
350
+ custom_image_url = 'https://www.googleapis.com/compute/beta/projects/' \
351
+ '{}/global/images/{}' .format (self .project_id ,
352
+ self .custom_image )
353
+ cluster_data ['config' ]['masterConfig' ]['imageUri' ] = custom_image_url
354
+ cluster_data ['config' ]['workerConfig' ]['imageUri' ] = custom_image_url
341
355
if self .properties :
342
356
cluster_data ['config' ]['softwareConfig' ]['properties' ] = self .properties
343
357
if self .idle_delete_ttl :
0 commit comments