Skip to content

Commit 1f29023

Browse files
potiukAlice Berard
authored and
Alice Berard
committed
[AIRFLOW-2912] Add Deploy and Delete operators for GCF (apache#3969)
Both Deploy and Delete operators interact with Google Cloud Functions to manage functions. Both are idempotent and make use of GcfHook - hook that encapsulates communication with GCP over GCP API.
1 parent 2396860 commit 1f29023

File tree

7 files changed

+1707
-0
lines changed

7 files changed

+1707
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
"""
21+
Example Airflow DAG that deletes a Google Cloud Function.
22+
This DAG relies on the following Airflow variables
23+
https://airflow.apache.org/concepts.html#variables
24+
* PROJECT_ID - Google Cloud Project where the Cloud Function exists.
25+
* LOCATION - Google Cloud Functions region where the function exists.
26+
* ENTRYPOINT - Name of the executable function in the source code.
27+
"""
28+
29+
import datetime
30+
31+
import airflow
32+
from airflow import models
33+
from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator
34+
35+
# [START howto_operator_gcf_delete_args]
36+
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
37+
LOCATION = models.Variable.get('LOCATION', '')
38+
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
39+
# A fully-qualified name of the function to delete
40+
41+
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
42+
ENTRYPOINT)
43+
default_args = {
44+
'start_date': airflow.utils.dates.days_ago(1)
45+
}
46+
# [END howto_operator_gcf_delete_args]
47+
48+
with models.DAG(
49+
'example_gcp_function_delete',
50+
default_args=default_args,
51+
schedule_interval=datetime.timedelta(days=1)
52+
) as dag:
53+
# [START howto_operator_gcf_delete]
54+
t1 = GcfFunctionDeleteOperator(
55+
task_id="gcf_delete_task",
56+
name=FUNCTION_NAME
57+
)
58+
# [END howto_operator_gcf_delete]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
"""
21+
Example Airflow DAG that creates a Google Cloud Function and then deletes it.
22+
23+
This DAG relies on the following Airflow variables
24+
https://airflow.apache.org/concepts.html#variables
25+
* PROJECT_ID - Google Cloud Project to use for the Cloud Function.
26+
* LOCATION - Google Cloud Functions region where the function should be
27+
created.
28+
* SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
29+
or
30+
* SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
31+
or
32+
* ZIP_PATH - Local path to the zipped source archive
33+
or
34+
* SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function is
35+
defined in a supported Cloud Source Repository URL format
36+
https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
37+
* ENTRYPOINT - Name of the executable function in the source code.
38+
"""
39+
40+
import datetime
41+
42+
from airflow import models
43+
from airflow.contrib.operators.gcp_function_operator \
44+
import GcfFunctionDeployOperator, GcfFunctionDeleteOperator
45+
from airflow.utils import dates
46+
47+
# [START howto_operator_gcf_deploy_variables]
48+
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
49+
LOCATION = models.Variable.get('LOCATION', '')
50+
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
51+
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
52+
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
53+
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
54+
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
55+
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
56+
ENTRYPOINT)
57+
RUNTIME = 'nodejs6'
58+
VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)
59+
60+
# [END howto_operator_gcf_deploy_variables]
61+
62+
# [START howto_operator_gcf_deploy_body]
63+
body = {
64+
"name": FUNCTION_NAME,
65+
"entryPoint": ENTRYPOINT,
66+
"runtime": RUNTIME,
67+
"httpsTrigger": {}
68+
}
69+
# [END howto_operator_gcf_deploy_body]
70+
71+
# [START howto_operator_gcf_deploy_args]
72+
default_args = {
73+
'start_date': dates.days_ago(1),
74+
'project_id': PROJECT_ID,
75+
'location': LOCATION,
76+
'body': body,
77+
'validate_body': VALIDATE_BODY
78+
}
79+
# [END howto_operator_gcf_deploy_args]
80+
81+
# [START howto_operator_gcf_deploy_variants]
82+
if SOURCE_ARCHIVE_URL:
83+
body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
84+
elif SOURCE_REPOSITORY:
85+
body['sourceRepository'] = {
86+
'url': SOURCE_REPOSITORY
87+
}
88+
elif ZIP_PATH:
89+
body['sourceUploadUrl'] = ''
90+
default_args['zip_path'] = ZIP_PATH
91+
elif SOURCE_UPLOAD_URL:
92+
body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
93+
else:
94+
raise Exception("Please provide one of the source_code parameters")
95+
# [END howto_operator_gcf_deploy_variants]
96+
97+
98+
with models.DAG(
99+
'example_gcp_function_deploy_delete',
100+
default_args=default_args,
101+
schedule_interval=datetime.timedelta(days=1)
102+
) as dag:
103+
# [START howto_operator_gcf_deploy]
104+
deploy_task = GcfFunctionDeployOperator(
105+
task_id="gcf_deploy_task",
106+
name=FUNCTION_NAME
107+
)
108+
# [END howto_operator_gcf_deploy]
109+
delete_task = GcfFunctionDeleteOperator(
110+
task_id="gcf_delete_task",
111+
name=FUNCTION_NAME
112+
)
113+
deploy_task >> delete_task
+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
import time
21+
import requests
22+
from googleapiclient.discovery import build
23+
24+
from airflow import AirflowException
25+
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
26+
27+
# Number of retries - used by googleapiclient method calls to perform retries
28+
# For requests that are "retriable"
29+
NUM_RETRIES = 5
30+
31+
# Time to sleep between active checks of the operation results
32+
TIME_TO_SLEEP_IN_SECONDS = 1
33+
34+
35+
# noinspection PyAbstractClass
36+
class GcfHook(GoogleCloudBaseHook):
37+
"""
38+
Hook for Google Cloud Functions APIs.
39+
"""
40+
_conn = None
41+
42+
def __init__(self,
43+
api_version,
44+
gcp_conn_id='google_cloud_default',
45+
delegate_to=None):
46+
super(GcfHook, self).__init__(gcp_conn_id, delegate_to)
47+
self.api_version = api_version
48+
49+
def get_conn(self):
50+
"""
51+
Retrieves connection to cloud functions.
52+
53+
:return: Google Cloud Function services object
54+
:rtype: dict
55+
"""
56+
if not self._conn:
57+
http_authorized = self._authorize()
58+
self._conn = build('cloudfunctions', self.api_version,
59+
http=http_authorized, cache_discovery=False)
60+
return self._conn
61+
62+
def get_function(self, name):
63+
"""
64+
Returns the function with a given name.
65+
66+
:param name: name of the function
67+
:type name: str
68+
:return: a CloudFunction object representing the function
69+
:rtype: dict
70+
"""
71+
return self.get_conn().projects().locations().functions().get(
72+
name=name).execute(num_retries=NUM_RETRIES)
73+
74+
def list_functions(self, full_location):
75+
"""
76+
Lists all functions created in the location.
77+
78+
:param full_location: full location including project. On the form
79+
of /projects/<PROJECT>/location/<LOCATION>
80+
:type full_location: str
81+
:return: array of CloudFunction objects - representing functions in the location
82+
:rtype: [dict]
83+
"""
84+
list_response = self.get_conn().projects().locations().functions().list(
85+
parent=full_location).execute(num_retries=NUM_RETRIES)
86+
return list_response.get("functions", [])
87+
88+
def create_new_function(self, full_location, body):
89+
"""
90+
Creates new cloud function in location given with body specified.
91+
92+
:param full_location: full location including project. On the form
93+
of /projects/<PROJECT>/location/<LOCATION>
94+
:type full_location: str
95+
:param body: body required by the cloud function insert API
96+
:type body: dict
97+
:return: response returned by the operation
98+
:rtype: dict
99+
"""
100+
response = self.get_conn().projects().locations().functions().create(
101+
location=full_location,
102+
body=body
103+
).execute(num_retries=NUM_RETRIES)
104+
operation_name = response["name"]
105+
return self._wait_for_operation_to_complete(operation_name)
106+
107+
def update_function(self, name, body, update_mask):
108+
"""
109+
Updates cloud function according to the update mask specified.
110+
111+
:param name: name of the function
112+
:type name: str
113+
:param body: body required by the cloud function patch API
114+
:type body: str
115+
:param update_mask: update mask - array of fields that should be patched
116+
:type update_mask: [str]
117+
:return: response returned by the operation
118+
:rtype: dict
119+
"""
120+
response = self.get_conn().projects().locations().functions().patch(
121+
updateMask=",".join(update_mask),
122+
name=name,
123+
body=body
124+
).execute(num_retries=NUM_RETRIES)
125+
operation_name = response["name"]
126+
return self._wait_for_operation_to_complete(operation_name)
127+
128+
def upload_function_zip(self, parent, zip_path):
129+
"""
130+
Uploads zip file with sources.
131+
132+
:param parent: project and location in which signed upload URL should be generated
133+
in the form of /projects/<PROJECT>/location/<LOCATION>
134+
:type parent: str
135+
:param zip_path: path of the file to upload (should point to valid .zip file)
136+
:type zip_path: str
137+
:return: Upload URL that was returned by generateUploadUrl method
138+
"""
139+
response = self.get_conn().projects().locations().functions().generateUploadUrl(
140+
parent=parent
141+
).execute(num_retries=NUM_RETRIES)
142+
upload_url = response.get('uploadUrl')
143+
with open(zip_path, 'rb') as fp:
144+
requests.put(
145+
url=upload_url,
146+
data=fp.read(),
147+
# Those two headers needs to be specified according to:
148+
# https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/generateUploadUrl
149+
# nopep8
150+
headers={
151+
'Content-type': 'application/zip',
152+
'x-goog-content-length-range': '0,104857600',
153+
}
154+
)
155+
return upload_url
156+
157+
def delete_function(self, name):
158+
"""
159+
Deletes cloud function specified by name.
160+
161+
:param name: name of the function
162+
:type name: str
163+
:return: response returned by the operation
164+
:rtype: dict
165+
"""
166+
response = self.get_conn().projects().locations().functions().delete(
167+
name=name).execute(num_retries=NUM_RETRIES)
168+
operation_name = response["name"]
169+
return self._wait_for_operation_to_complete(operation_name)
170+
171+
def _wait_for_operation_to_complete(self, operation_name):
172+
"""
173+
Waits for the named operation to complete - checks status of the
174+
asynchronous call.
175+
176+
:param operation_name: name of the operation
177+
:type operation_name: str
178+
:return: response returned by the operation
179+
:rtype: dict
180+
:exception: AirflowException in case error is returned
181+
"""
182+
service = self.get_conn()
183+
while True:
184+
operation_response = service.operations().get(
185+
name=operation_name,
186+
).execute(num_retries=NUM_RETRIES)
187+
if operation_response.get("done"):
188+
response = operation_response.get("response")
189+
error = operation_response.get("error")
190+
# Note, according to documentation always either response or error is
191+
# set when "done" == True
192+
if error:
193+
raise AirflowException(str(error))
194+
return response
195+
time.sleep(TIME_TO_SLEEP_IN_SECONDS)

0 commit comments

Comments
 (0)