Skip to content

Commit 0be002e

Browse files
ashbbolkedebruin
authored andcommitted
[AIRFLOW-2140] Don't require kubernetes for the SparkSubmit hook (apache#3700)
This extra dep is a quasi-breaking change when upgrading - previously there were no deps outside of Airflow itself for this hook. Importing the k8s libs breaks installs that aren't also using Kubernetes. This makes the dep optional for anyone who doesn't explicitly use the functionality
1 parent 142a942 commit 0be002e

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

airflow/contrib/hooks/spark_submit_hook.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from airflow.exceptions import AirflowException
2727
from airflow.utils.log.logging_mixin import LoggingMixin
2828
from airflow.contrib.kubernetes import kube_client
29-
from kubernetes.client.rest import ApiException
3029

3130

3231
class SparkSubmitHook(BaseHook, LoggingMixin):
@@ -136,6 +135,10 @@ def __init__(self,
136135
self._connection = self._resolve_connection()
137136
self._is_yarn = 'yarn' in self._connection['master']
138137
self._is_kubernetes = 'k8s' in self._connection['master']
138+
if self._is_kubernetes and kube_client is None:
139+
raise RuntimeError(
140+
"{master} specified by kubernetes dependencies are not installed!".format(
141+
self._connection['master']))
139142

140143
self._should_track_driver_status = self._resolve_should_track_driver_status()
141144
self._driver_id = None
@@ -559,6 +562,6 @@ def on_kill(self):
559562

560563
self.log.info("Spark on K8s killed with response: %s", api_response)
561564

562-
except ApiException as e:
565+
except kube_client.ApiException as e:
563566
self.log.info("Exception when attempting to kill Spark on K8s:")
564567
self.log.exception(e)

airflow/contrib/kubernetes/kube_client.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,21 @@
1717
from airflow.configuration import conf
1818
from six import PY2
1919

20+
try:
21+
from kubernetes import config, client
22+
from kubernetes.client.rest import ApiException
23+
has_kubernetes = True
24+
except ImportError as e:
25+
# We need an exception class to be able to use it in ``except`` elsewhere
26+
# in the code base
27+
ApiException = BaseException
28+
has_kubernetes = False
29+
_import_err = e
30+
2031

2132
def _load_kube_config(in_cluster, cluster_context, config_file):
22-
from kubernetes import config, client
33+
if not has_kubernetes:
34+
raise _import_err
2335
if in_cluster:
2436
config.load_incluster_config()
2537
else:

0 commit comments

Comments
 (0)