From 7922fb29921a8ca8bbadf7e47eaba6f94c2b3ac5 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Thu, 4 Oct 2018 03:20:24 -0400 Subject: [PATCH] [AIRFLOW-3103][AIRFLOW-3147] Update flask-appbuilder (#3937) --- UPDATING.md | 142 ++++++- .../auth/backends/github_enterprise_auth.py | 3 + airflow/contrib/auth/backends/google_auth.py | 3 + .../contrib/auth/backends/kerberos_auth.py | 5 +- airflow/contrib/auth/backends/ldap_auth.py | 5 +- .../contrib/auth/backends/password_auth.py | 5 +- airflow/default_login.py | 3 + airflow/www/utils.py | 8 +- airflow/www/views.py | 2 +- airflow/www_rbac/decorators.py | 2 +- airflow/www_rbac/security.py | 395 +++++++++++++++--- .../templates/appbuilder/navbar_right.html | 2 +- setup.py | 4 +- tests/www/test_utils.py | 6 +- tests/www_rbac/test_security.py | 104 +++-- 15 files changed, 570 insertions(+), 119 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 3b47882212225..74337f3fe88de 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -5,15 +5,12 @@ assists users migrating to a new version. ## Airflow Master -## Airflow 1.10 - -Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or -`AIRFLOW_GPL_UNIDECODE=yes`. In case of the latter a GPL runtime dependency will be installed due to a -dependency (python-nvd3 -> python-slugify -> unidecode). +### Rename of BashTaskRunner to StandardTaskRunner -### Replace DataProcHook.await calls to DataProcHook.wait +BashTaskRunner has been renamed to StandardTaskRunner. It is the default task runner +so you might need to update your config. -The method name was changed to be compatible with the Python 3.7 async/await keywords +`task_runner = StandardTaskRunner` ### DAG level Access Control for new RBAC UI @@ -23,9 +20,68 @@ that he has permissions on. If a new role wants to access all the dags, the admi We also provide a new cli command(``sync_perm``) to allow admin to auto sync permissions. + +### min_file_parsing_loop_time config option temporarily disabled + +The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to +some bugs. + +### new `sync_parallelism` config option in celery section + +The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to +fetch celery task state in parallel. Default value is max(1, number of cores - 1) + +### CLI Changes + +The ability to manipulate users from the command line has been changed. 'airflow create_user' and 'airflow delete_user' and 'airflow list_users' has been grouped to a single command `airflow users` with optional flags `--create`, `--list` and `--delete`. + +Example Usage: + +To create a new user: +```bash +airflow users --create --username jondoe --lastname doe --firstname jon --email jdoe@apache.org --role Viewer --password test +``` + +To list users: +```bash +airflow users --list +``` + +To delete a user: +```bash +airflow users --delete --username jondoe +``` + +### Custom auth backends interface change + +We have updated the version of flask-login we depend upon, and as a result any +custom auth backends might need a small change: `is_active`, +`is_authenticated`, and `is_anonymous` should now be properties. What this means is if +previously you had this in your user class + + def is_active(self): + return self.active + +then you need to change it like this + + @property + def is_active(self): + return self.active + +## Airflow 1.10 + +Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or +`AIRFLOW_GPL_UNIDECODE=yes`. In case of the latter a GPL runtime dependency will be installed due to a +dependency (python-nvd3 -> python-slugify -> unidecode). + +### Replace DataProcHook.await calls to DataProcHook.wait + +The method name was changed to be compatible with the Python 3.7 async/await keywords + ### Setting UTF-8 as default mime_charset in email utils ### Add a configuration variable(default_dag_run_display_number) to control numbers of dag run for display + Add a configuration variable(default_dag_run_display_number) under webserver section to control num of dag run to show in UI. ### Default executor for SubDagOperator is changed to SequentialExecutor @@ -55,11 +111,13 @@ Run `airflow webserver` to start the new UI. This will bring up a log in page, e There are five roles created for Airflow by default: Admin, User, Op, Viewer, and Public. To configure roles/permissions, go to the `Security` tab and click `List Roles` in the new UI. #### Breaking changes + - AWS Batch Operator renamed property queue to job_queue to prevent conflict with the internal queue from CeleryExecutor - AIRFLOW-2542 - Users created and stored in the old users table will not be migrated automatically. FAB's built-in authentication support must be reconfigured. - Airflow dag home page is now `/home` (instead of `/admin`). - All ModelViews in Flask-AppBuilder follow a different pattern from Flask-Admin. The `/admin` part of the url path will no longer exist. For example: `/admin/connection` becomes `/connection/list`, `/admin/connection/new` becomes `/connection/add`, `/admin/connection/edit` becomes `/connection/edit`, etc. - Due to security concerns, the new webserver will no longer support the features in the `Data Profiling` menu of old UI, including `Ad Hoc Query`, `Charts`, and `Known Events`. +- HiveServer2Hook.get_results() always returns a list of tuples, even when a single column is queried, as per Python API 2. ### airflow.contrib.sensors.hdfs_sensors renamed to airflow.contrib.sensors.hdfs_sensor @@ -73,6 +131,7 @@ to have specified `explicit_defaults_for_timestamp=1` in your my.cnf under `[mys ### Celery config To make the config of Airflow compatible with Celery, some properties have been renamed: + ``` celeryd_concurrency -> worker_concurrency celery_result_backend -> result_backend @@ -80,29 +139,35 @@ celery_ssl_active -> ssl_active celery_ssl_cert -> ssl_cert celery_ssl_key -> ssl_key ``` + Resulting in the same config parameters as Celery 4, with more transparency. ### GCP Dataflow Operators + Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a default "airflow-version" label, please upgrade your google-cloud-dataflow or apache-beam version to 2.2.0 or greater. ### BigQuery Hooks and Operator + The `bql` parameter passed to `BigQueryOperator` and `BigQueryBaseCursor.run_query` has been deprecated and renamed to `sql` for consistency purposes. Using `bql` will still work (and raise a `DeprecationWarning`), but is no longer supported and will be removed entirely in Airflow 2.0 ### Redshift to S3 Operator + With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row, we need to turn off parallel unload. It is preferred to perform unload operation using all nodes so that it is faster for larger tables. So, parameter called `include_header` is added and default is set to False. -Header row will be added only if this parameter is set True and also in that case parallel will be automatically turned off (`PARALLEL OFF`) +Header row will be added only if this parameter is set True and also in that case parallel will be automatically turned off (`PARALLEL OFF`) ### Google cloud connection string With Airflow 1.9 or lower, there were two connection strings for the Google Cloud operators, both `google_cloud_storage_default` and `google_cloud_default`. This can be confusing and therefore the `google_cloud_storage_default` connection id has been replaced with `google_cloud_default` to make the connection id consistent across Airflow. ### Logging Configuration + With Airflow 1.9 or lower, `FILENAME_TEMPLATE`, `PROCESSOR_FILENAME_TEMPLATE`, `LOG_ID_TEMPLATE`, `END_OF_LOG_MARK` were configured in `airflow_local_settings.py`. These have been moved into the configuration file, and hence if you were using a custom configuration file the following defaults need to be added. + ``` [core] fab_logging_level = WARN @@ -110,27 +175,40 @@ log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number log_processor_filename_template = {{ filename }}.log [elasticsearch] -elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} +elasticsearch_log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} elasticsearch_end_of_log_mark = end_of_log ``` +The previous setting of `log_task_reader` is not needed in many cases now when using the default logging config with remote storages. (Previously it needed to be set to `s3.task` or similar. This is not needed with the default config anymore) + +#### Change of per-task log path + +With the change to Airflow core to be timezone aware the default log path for task instances will now include timezone information. This will by default mean all previous task logs won't be found. You can get the old behaviour back by setting the following config options: + +``` +[core] +log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime("%%Y-%%m-%%dT%%H:%%M:%%S") }}/{{ try_number }}.log +``` + ## Airflow 1.9 ### SSH Hook updates, along with new SSH Operator & SFTP Operator SSH Hook now uses the Paramiko library to create an ssh client connection, instead of the sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible. - - update SSHHook constructor - - use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer to test_ssh_operator.py for usage info. - - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer to test_sftp_operator.py.py for usage info. - - No updates are required if you are using ftpHook, it will continue to work as is. + +- update SSHHook constructor +- use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer to test_ssh_operator.py for usage info. +- SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer to test_sftp_operator.py.py for usage info. +- No updates are required if you are using ftpHook, it will continue to work as is. ### S3Hook switched to use Boto3 The airflow.hooks.S3_hook.S3Hook has been switched to use boto3 instead of the older boto (a.k.a. boto2). This results in a few backwards incompatible changes to the following classes: S3Hook: - - the constructors no longer accepts `s3_conn_id`. It is now called `aws_conn_id`. - - the default connection is now "aws_default" instead of "s3_default" - - the return type of objects returned by `get_bucket` is now boto3.s3.Bucket - - the return type of `get_key`, and `get_wildcard_key` is now an boto3.S3.Object. + +- the constructors no longer accepts `s3_conn_id`. It is now called `aws_conn_id`. +- the default connection is now "aws_default" instead of "s3_default" +- the return type of objects returned by `get_bucket` is now boto3.s3.Bucket +- the return type of `get_key`, and `get_wildcard_key` is now an boto3.S3.Object. If you are using any of these in your DAGs and specify a connection ID you will need to update the parameter name for the connection to "aws_conn_id": S3ToHiveTransfer, S3PrefixSensor, S3KeySensor, RedshiftToS3Transfer. @@ -306,10 +384,11 @@ The `file_task_handler` logger has been made more flexible. The default format c If you are logging to Google cloud storage, please see the [Google cloud platform documentation](https://airflow.incubator.apache.org/integration.html#gcp-google-cloud-platform) for logging instructions. If you are using S3, the instructions should be largely the same as the Google cloud platform instructions above. You will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps: - - Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py). - - Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable. - - Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore. - - Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8. + +- Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py). +- Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable. +- Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore. +- Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8. ### New Features @@ -318,8 +397,10 @@ If you are using S3, the instructions should be largely the same as the Google c A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters. ### Deprecated Features + These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0 + - If you're using the `google_cloud_conn_id` or `dataproc_cluster` argument names explicitly in `contrib.operators.Dataproc{*}Operator`(s), be sure to rename them to `gcp_conn_id` or `cluster_name`, respectively. We've renamed these arguments for consistency. (AIRFLOW-1323) - `post_execute()` hooks now take two arguments, `context` and `result` @@ -343,30 +424,36 @@ a previously installed version of Airflow before installing 1.8.1. ## Airflow 1.8 ### Database + The database schema needs to be upgraded. Make sure to shutdown Airflow and make a backup of your database. To upgrade the schema issue `airflow upgradedb`. ### Upgrade systemd unit files + Systemd unit files have been updated. If you use systemd please make sure to update these. > Please note that the webserver does not detach properly, this will be fixed in a future version. ### Tasks not starting although dependencies are met due to stricter pool checking + Airflow 1.7.1 has issues with being able to over subscribe to a pool, ie. more slots could be used than were available. This is fixed in Airflow 1.8.0, but due to past issue jobs may fail to start although their dependencies are met after an upgrade. To workaround either temporarily increase the amount of slots above the amount of queued tasks or use a new pool. ### Less forgiving scheduler on dynamic start_date + Using a dynamic start_date (e.g. `start_date = datetime.now()`) is not considered a best practice. The 1.8.0 scheduler is less forgiving in this area. If you encounter DAGs not being scheduled you can try using a fixed start_date and renaming your DAG. The last step is required to make sure you start with a clean slate, otherwise the old schedule can interfere. ### New and updated scheduler options + Please read through the new scheduler options, defaults have changed since 1.7.1. #### child_process_log_directory + In order to increase the robustness of the scheduler, DAGS are now processed in their own process. Therefore each DAG has its own log file for the scheduler. These log files are placed in `child_process_log_directory` which defaults to `/scheduler/latest`. You will need to make sure these log files are removed. @@ -374,24 +461,30 @@ DAG has its own log file for the scheduler. These log files are placed in `child > DAG logs or processor logs ignore and command line settings for log file locations. #### run_duration + Previously the command line option `num_runs` was used to let the scheduler terminate after a certain amount of loops. This is now time bound and defaults to `-1`, which means run continuously. See also num_runs. #### num_runs + Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies the number of times to try to schedule each DAG file within `run_duration` time. Defaults to `-1`, which means try indefinitely. This is only available on the command line. #### min_file_process_interval + After how much time should an updated DAG be picked up from the filesystem. #### min_file_parsing_loop_time +CURRENTLY DISABLED DUE TO A BUG How many seconds to wait between file-parsing loops to prevent the logs from being spammed. #### dag_dir_list_interval + The frequency with which the scheduler should relist the contents of the DAG directory. If while developing +dags, they are not being picked up, have a look at this number and decrease it when necessary. #### catchup_by_default + By default the scheduler will fill any missing interval DAG Runs between the last execution date and the current date. This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as `catchup = False / True`. Command line backfills will still work. @@ -422,6 +515,7 @@ required to whitelist these variables by adding the following to your configurat airflow\.ctx\..* ``` + ### Google Cloud Operator and Hook alignment All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection @@ -433,6 +527,7 @@ Also the old P12 key file type is not supported anymore and only the new JSON ke account. ### Deprecated Features + These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0 @@ -449,6 +544,7 @@ supported and will be removed entirely in Airflow 2.0 - The config value secure_mode will default to True which will disable some insecure endpoints/features ### Known Issues + There is a report that the default of "-1" for num_runs creates an issue where errors are reported while parsing tasks. It was not confirmed, but a workaround was found by changing the default back to `None`. @@ -475,7 +571,9 @@ To continue using the default smtp email backend, change the email_backend line [email] email_backend = airflow.utils.send_email_smtp ``` + to: + ``` [email] email_backend = airflow.utils.email.send_email_smtp @@ -488,7 +586,9 @@ To continue using S3 logging, update your config file so: ``` s3_log_folder = s3://my-airflow-log-bucket/logs ``` + becomes: + ``` remote_base_log_folder = s3://my-airflow-log-bucket/logs remote_log_conn_id = diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py index 641b81e46da63..78afee46078dc 100644 --- a/airflow/contrib/auth/backends/github_enterprise_auth.py +++ b/airflow/contrib/auth/backends/github_enterprise_auth.py @@ -47,14 +47,17 @@ class GHEUser(models.User): def __init__(self, user): self.user = user + @property def is_active(self): """Required by flask_login""" return True + @property def is_authenticated(self): """Required by flask_login""" return True + @property def is_anonymous(self): """Required by flask_login""" return False diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py index e41934b926c31..32ad6962a0a82 100644 --- a/airflow/contrib/auth/backends/google_auth.py +++ b/airflow/contrib/auth/backends/google_auth.py @@ -46,14 +46,17 @@ class GoogleUser(models.User): def __init__(self, user): self.user = user + @property def is_active(self): """Required by flask_login""" return True + @property def is_authenticated(self): """Required by flask_login""" return True + @property def is_anonymous(self): """Required by flask_login""" return False diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py index 08be299a197dd..4b5bb295d9685 100644 --- a/airflow/contrib/auth/backends/kerberos_auth.py +++ b/airflow/contrib/auth/backends/kerberos_auth.py @@ -73,14 +73,17 @@ def authenticate(username, password): return + @property def is_active(self): """Required by flask_login""" return True + @property def is_authenticated(self): """Required by flask_login""" return True + @property def is_anonymous(self): """Required by flask_login""" return False @@ -110,7 +113,7 @@ def load_user(userid, session=None): @provide_session def login(self, request, session=None): - if current_user.is_authenticated(): + if current_user.is_authenticated: flash("You are already logged in") return redirect(url_for('index')) diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py index eefaa1263b250..a7eb62e391763 100644 --- a/airflow/contrib/auth/backends/ldap_auth.py +++ b/airflow/contrib/auth/backends/ldap_auth.py @@ -236,14 +236,17 @@ def try_login(username, password): log.info("Password incorrect for user %s", username) raise AuthenticationError("Invalid username or password") + @property def is_active(self): """Required by flask_login""" return True + @property def is_authenticated(self): """Required by flask_login""" return True + @property def is_anonymous(self): """Required by flask_login""" return False @@ -274,7 +277,7 @@ def load_user(userid, session=None): @provide_session def login(self, request, session=None): - if current_user.is_authenticated(): + if current_user.is_authenticated: flash("You are already logged in") return redirect(url_for('admin.index')) diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py index 879aaa142a4db..55f5daf8fdf76 100644 --- a/airflow/contrib/auth/backends/password_auth.py +++ b/airflow/contrib/auth/backends/password_auth.py @@ -71,14 +71,17 @@ def password(self, plaintext): def authenticate(self, plaintext): return check_password_hash(self._password, plaintext) + @property def is_active(self): """Required by flask_login""" return True + @property def is_authenticated(self): """Required by flask_login""" return True + @property def is_anonymous(self): """Required by flask_login""" return False @@ -137,7 +140,7 @@ def authenticate(session, username, password): @provide_session def login(self, request, session=None): - if current_user.is_authenticated(): + if current_user.is_authenticated: flash("You are already logged in") return redirect(url_for('admin.index')) diff --git a/airflow/default_login.py b/airflow/default_login.py index d44dbf39ea9b3..b6453c55d0c69 100644 --- a/airflow/default_login.py +++ b/airflow/default_login.py @@ -44,14 +44,17 @@ class DefaultUser(object): def __init__(self, user): self.user = user + @property def is_active(self): """Required by flask_login""" return True + @property def is_authenticated(self): """Required by flask_login""" return True + @property def is_anonymous(self): """Required by flask_login""" return False diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 44fa5c4dcd6fb..75b998231863b 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -63,8 +63,8 @@ class LoginMixin(object): def is_accessible(self): return ( not AUTHENTICATE or ( - not current_user.is_anonymous() and - current_user.is_authenticated() + not current_user.is_anonymous and + current_user.is_authenticated ) ) @@ -73,7 +73,7 @@ class SuperUserMixin(object): def is_accessible(self): return ( not AUTHENTICATE or - (not current_user.is_anonymous() and current_user.is_superuser()) + (not current_user.is_anonymous and current_user.is_superuser()) ) @@ -81,7 +81,7 @@ class DataProfilingMixin(object): def is_accessible(self): return ( not AUTHENTICATE or - (not current_user.is_anonymous() and current_user.data_profiling()) + (not current_user.is_anonymous and current_user.data_profiling()) ) diff --git a/airflow/www/views.py b/airflow/www/views.py index 8f6725ef59b44..38b0d29d5587f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -266,7 +266,7 @@ def data_profiling_required(f): def decorated_function(*args, **kwargs): if ( current_app.config['LOGIN_DISABLED'] or - (not current_user.is_anonymous() and current_user.data_profiling()) + (not current_user.is_anonymous and current_user.data_profiling()) ): return f(*args, **kwargs) else: diff --git a/airflow/www_rbac/decorators.py b/airflow/www_rbac/decorators.py index 2dd1af45df09d..41be4eb4be597 100644 --- a/airflow/www_rbac/decorators.py +++ b/airflow/www_rbac/decorators.py @@ -32,7 +32,7 @@ def action_logging(f): @functools.wraps(f) def wrapper(*args, **kwargs): session = settings.Session() - if g.user.is_anonymous(): + if g.user.is_anonymous: user = 'anonymous' else: user = g.user.username diff --git a/airflow/www_rbac/security.py b/airflow/www_rbac/security.py index d2271f822a47e..6bb67d4d8338e 100644 --- a/airflow/www_rbac/security.py +++ b/airflow/www_rbac/security.py @@ -7,22 +7,30 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# +import logging +from flask import g from flask_appbuilder.security.sqla import models as sqla_models +from flask_appbuilder.security.sqla.manager import SecurityManager +from sqlalchemy import or_ + +from airflow import models, settings +from airflow.www_rbac.app import appbuilder ########################################################################### # VIEW MENUS ########################################################################### -viewer_vms = [ +viewer_vms = { 'Airflow', 'DagModelView', 'Browse', @@ -42,11 +50,11 @@ 'About', 'Version', 'VersionView', -] +} user_vms = viewer_vms -op_vms = [ +op_vms = { 'Admin', 'Configurations', 'ConfigurationView', @@ -58,13 +66,13 @@ 'VariableModelView', 'XComs', 'XComModelView', -] +} ########################################################################### # PERMISSIONS ########################################################################### -viewer_perms = [ +viewer_perms = { 'menu_access', 'can_index', 'can_list', @@ -75,6 +83,7 @@ 'can_task_stats', 'can_code', 'can_log', + 'can_get_logs_with_metadata', 'can_tries', 'can_graph', 'can_tree', @@ -88,9 +97,9 @@ 'can_rendered', 'can_pickle_info', 'can_version', -] +} -user_perms = [ +user_perms = { 'can_dagrun_clear', 'can_run', 'can_trigger', @@ -105,12 +114,23 @@ 'set_running', 'set_success', 'clear', -] + 'can_clear', +} -op_perms = [ +op_perms = { 'can_conf', 'can_varimport', -] +} + +# global view-menu for dag-level access +dag_vms = { + 'all_dags' +} + +dag_perms = { + 'can_dag_read', + 'can_dag_edit', +} ########################################################################### # DEFAULT ROLE CONFIGURATIONS @@ -120,60 +140,317 @@ { 'role': 'Viewer', 'perms': viewer_perms, - 'vms': viewer_vms, + 'vms': viewer_vms | dag_vms }, { 'role': 'User', - 'perms': viewer_perms + user_perms, - 'vms': viewer_vms + user_vms, + 'perms': viewer_perms | user_perms | dag_perms, + 'vms': viewer_vms | dag_vms | user_vms, }, { 'role': 'Op', - 'perms': viewer_perms + user_perms + op_perms, - 'vms': viewer_vms + user_vms + op_vms, + 'perms': viewer_perms | user_perms | op_perms | dag_perms, + 'vms': viewer_vms | dag_vms | user_vms | op_vms, }, ] +EXISTING_ROLES = { + 'Admin', + 'Viewer', + 'User', + 'Op', + 'Public', +} + + +class AirflowSecurityManager(SecurityManager): + + def init_role(self, role_name, role_vms, role_perms): + """ + Initialize the role with the permissions and related view-menus. + + :param role_name: + :param role_vms: + :param role_perms: + :return: + """ + pvms = self.get_session.query(sqla_models.PermissionView).all() + pvms = [p for p in pvms if p.permission and p.view_menu] + + role = self.find_role(role_name) + if not role: + role = self.add_role(role_name) + + role_pvms = [] + for pvm in pvms: + if pvm.view_menu.name in role_vms and pvm.permission.name in role_perms: + role_pvms.append(pvm) + role.permissions = list(set(role_pvms)) + self.get_session.merge(role) + self.get_session.commit() + + def get_user_roles(self, user=None): + """ + Get all the roles associated with the user. + """ + if user is None: + user = g.user + if user.is_anonymous: + public_role = appbuilder.config.get('AUTH_ROLE_PUBLIC') + return [appbuilder.security_manager.find_role(public_role)] \ + if public_role else [] + return user.roles + + def get_all_permissions_views(self): + """ + Returns a set of tuples with the perm name and view menu name + """ + perms_views = set() + for role in self.get_user_roles(): + for perm_view in role.permissions: + perms_views.add((perm_view.permission.name, perm_view.view_menu.name)) + return perms_views + + def get_accessible_dag_ids(self, username=None): + """ + Return a set of dags that user has access to(either read or write). + + :param username: Name of the user. + :return: A set of dag ids that the user could access. + """ + if not username: + username = g.user + + if username.is_anonymous or 'Public' in username.roles: + # return an empty list if the role is public + return set() + + roles = {role.name for role in username.roles} + if {'Admin', 'Viewer', 'User', 'Op'} & roles: + return dag_vms + + user_perms_views = self.get_all_permissions_views() + # return all dags that the user could access + return set([view for perm, view in user_perms_views if perm in dag_perms]) + + def has_access(self, permission, view_name, user=None): + """ + Verify whether a given user could perform certain permission + (e.g can_read, can_write) on the given dag_id. + + :param str permission: permission on dag_id(e.g can_read, can_edit). + :param str view_name: name of view-menu(e.g dag id is a view-menu as well). + :param str user: user name + :return: a bool whether user could perform certain permission on the dag_id. + """ + if not user: + user = g.user + if user.is_anonymous: + return self.is_item_public(permission, view_name) + return self._has_view_access(user, permission, view_name) + + def _get_and_cache_perms(self): + """ + Cache permissions-views + """ + self.perms = self.get_all_permissions_views() + + def _has_role(self, role_name_or_list): + """ + Whether the user has this role name + """ + if not isinstance(role_name_or_list, list): + role_name_or_list = [role_name_or_list] + return any( + [r.name in role_name_or_list for r in self.get_user_roles()]) + + def _has_perm(self, permission_name, view_menu_name): + """ + Whether the user has this perm + """ + if hasattr(self, 'perms'): + if (permission_name, view_menu_name) in self.perms: + return True + # rebuild the permissions set + self._get_and_cache_perms() + return (permission_name, view_menu_name) in self.perms + + def has_all_dags_access(self): + """ + Has all the dag access in any of the 3 cases: + 1. Role needs to be in (Admin, Viewer, User, Op). + 2. Has can_dag_read permission on all_dags view. + 3. Has can_dag_edit permission on all_dags view. + """ + return ( + self._has_role(['Admin', 'Viewer', 'Op', 'User']) or + self._has_perm('can_dag_read', 'all_dags') or + self._has_perm('can_dag_edit', 'all_dags')) + + def clean_perms(self): + """ + FAB leaves faulty permissions that need to be cleaned up + """ + logging.info('Cleaning faulty perms') + sesh = self.get_session + pvms = ( + sesh.query(sqla_models.PermissionView) + .filter(or_( + sqla_models.PermissionView.permission == None, # NOQA + sqla_models.PermissionView.view_menu == None, # NOQA + )) + ) + deleted_count = pvms.delete() + sesh.commit() + if deleted_count: + logging.info('Deleted {} faulty permissions'.format(deleted_count)) + + def _merge_perm(self, permission_name, view_menu_name): + """ + Add the new permission , view_menu to ab_permission_view_role if not exists. + It will add the related entry to ab_permission + and ab_view_menu two meta tables as well. + + :param str permission_name: Name of the permission. + :param str view_menu_name: Name of the view-menu + + :return: + """ + permission = self.find_permission(permission_name) + view_menu = self.find_view_menu(view_menu_name) + pv = None + if permission and view_menu: + pv = self.get_session.query(self.permissionview_model).filter_by( + permission=permission, view_menu=view_menu).first() + if not pv and permission_name and view_menu_name: + self.add_permission_view_menu(permission_name, view_menu_name) + + def create_custom_dag_permission_view(self): + """ + Workflow: + 1. when scheduler found a new dag, we will create an entry in ab_view_menu + 2. we fetch all the roles associated with dag users. + 3. we join and create all the entries for ab_permission_view_menu + (predefined permissions * dag-view_menus) + 4. Create all the missing role-permission-views for the ab_role_permission_views + + :return: None. + """ + # todo(Tao): should we put this function here or in scheduler loop? + logging.info('Fetching a set of all permission, view_menu from FAB meta-table') + + def merge_pv(perm, view_menu): + """Create permission view menu only if it doesn't exist""" + if view_menu and perm and (view_menu, perm) not in all_pvs: + self._merge_perm(perm, view_menu) + + all_pvs = set() + for pv in self.get_session.query(self.permissionview_model).all(): + if pv.permission and pv.view_menu: + all_pvs.add((pv.permission.name, pv.view_menu.name)) + + # create perm for global logical dag + for dag in dag_vms: + for perm in dag_perms: + merge_pv(perm, dag) + + # Get all the active / paused dags and insert them into a set + all_dags_models = settings.Session.query(models.DagModel)\ + .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))\ + .filter(~models.DagModel.is_subdag).all() + + for dag in all_dags_models: + for perm in dag_perms: + merge_pv(perm, dag.dag_id) + + # for all the dag-level role, add the permission of viewer + # with the dag view to ab_permission_view + all_roles = self.get_all_roles() + user_role = self.find_role('User') + + dag_role = [role for role in all_roles if role.name not in EXISTING_ROLES] + update_perm_views = [] + + # todo(tao) need to remove all_dag vm + dag_vm = self.find_view_menu('all_dags') + ab_perm_view_role = sqla_models.assoc_permissionview_role + perm_view = self.permissionview_model + view_menu = self.viewmenu_model + + # todo(tao) comment on the query + all_perm_view_by_user = settings.Session.query(ab_perm_view_role)\ + .join(perm_view, perm_view.id == ab_perm_view_role + .columns.permission_view_id)\ + .filter(ab_perm_view_role.columns.role_id == user_role.id)\ + .join(view_menu)\ + .filter(perm_view.view_menu_id != dag_vm.id) + all_perm_views = set([role.permission_view_id for role in all_perm_view_by_user]) + + for role in dag_role: + # Get all the perm-view of the role + existing_perm_view_by_user = self.get_session.query(ab_perm_view_role)\ + .filter(ab_perm_view_role.columns.role_id == role.id) + + existing_perms_views = set([role.permission_view_id + for role in existing_perm_view_by_user]) + missing_perm_views = all_perm_views - existing_perms_views + + for perm_view_id in missing_perm_views: + update_perm_views.append({'permission_view_id': perm_view_id, + 'role_id': role.id}) + + self.get_session.execute(ab_perm_view_role.insert(), update_perm_views) + self.get_session.commit() + + def update_admin_perm_view(self): + """ + Admin should have all the permission-views. + Add the missing ones to the table for admin. + + :return: None. + """ + pvms = self.get_session.query(sqla_models.PermissionView).all() + pvms = [p for p in pvms if p.permission and p.view_menu] + + admin = self.find_role('Admin') + existing_perms_vms = set(admin.permissions) + for p in pvms: + if p not in existing_perms_vms: + existing_perms_vms.add(p) + admin.permissions = list(existing_perms_vms) + self.get_session.commit() + + def sync_roles(self): + """ + 1. Init the default role(Admin, Viewer, User, Op, public) + with related permissions. + 2. Init the custom role(dag-user) with related permissions. + + :return: None. + """ + logging.info('Start syncing user roles.') + + # Create default user role. + for config in ROLE_CONFIGS: + role = config['role'] + vms = config['vms'] + perms = config['perms'] + self.init_role(role, vms, perms) + self.create_custom_dag_permission_view() + + # init existing roles, the rest role could be created through UI. + self.update_admin_perm_view() + self.clean_perms() + + def sync_perm_for_dag(self, dag_id): + """ + Sync permissions for given dag id. The dag id surely exists in our dag bag + as only /refresh button will call this function -def init_role(sm, role_name, role_vms, role_perms): - sm_session = sm.get_session - pvms = sm_session.query(sqla_models.PermissionView).all() - pvms = [p for p in pvms if p.permission and p.view_menu] - - valid_perms = [p.permission.name for p in pvms] - valid_vms = [p.view_menu.name for p in pvms] - invalid_perms = [p for p in role_perms if p not in valid_perms] - if invalid_perms: - raise Exception('The following permissions are not valid: {}' - .format(invalid_perms)) - invalid_vms = [v for v in role_vms if v not in valid_vms] - if invalid_vms: - raise Exception('The following view menus are not valid: {}' - .format(invalid_vms)) - - role = sm.add_role(role_name) - role_pvms = [] - for pvm in pvms: - if pvm.view_menu.name in role_vms and pvm.permission.name in role_perms: - role_pvms.append(pvm) - role_pvms = list(set(role_pvms)) - role.permissions = role_pvms - sm_session.merge(role) - sm_session.commit() - - -def init_roles(appbuilder): - for config in ROLE_CONFIGS: - name = config['role'] - vms = config['vms'] - perms = config['perms'] - init_role(appbuilder.sm, name, vms, perms) - - -def is_view_only(user, appbuilder): - if user.is_anonymous(): - anonymous_role = appbuilder.sm.auth_role_public - return anonymous_role == 'Viewer' - - user_roles = user.roles - return len(user_roles) == 1 and user_roles[0].name == 'Viewer' + :param dag_id: + :return: + """ + for dag_perm in dag_perms: + perm_on_dag = self.find_permission_view_menu(dag_perm, dag_id) + if perm_on_dag is None: + self.add_permission_view_menu(dag_perm, dag_id) diff --git a/airflow/www_rbac/templates/appbuilder/navbar_right.html b/airflow/www_rbac/templates/appbuilder/navbar_right.html index bf5aa43221884..d42f8e2e8a82c 100644 --- a/airflow/www_rbac/templates/appbuilder/navbar_right.html +++ b/airflow/www_rbac/templates/appbuilder/navbar_right.html @@ -47,7 +47,7 @@
  • -{% if not current_user.is_anonymous() %} +{% if not current_user.is_anonymous %}