Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3239] Further fix of CI tests #4131

Merged
merged 1 commit into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,5 @@
from __future__ import absolute_import

from .api import *
from .configuration import *
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we remove these import lines?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was needed to find the tests before, but now the files are correctly named with test_*.py the more standard test discovery process finds them. If you've noticed in the logs something about "Transplated" class or similar, this was the cause, and naming them properly makes it bahve

Copy link
Member Author

@XD-DENG XD-DENG Nov 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @feng-tao , for more details, you can refer to JIRA ticket https://issues.apache.org/jira/browse/AIRFLOW-3239, and another two PRs (#4049, #4074)

Some history:

  • Firstly, I found some tests were never run since they were not named correctly. Test discovery process fails to find them (like tests for DockerOperator before [AIRFLOW-3203] Fix DockerOperator & some operator test #4049).
  • Some test files were not named correctly while they were still run, because of usage like from .configuration import * in tests/__init__.py, while this may not be the most proper method as @ashb shared above.

Any import lines removed in tests/__init__.py come together with a test file renaming like tests/configuration.py → tests/test_configuration.py.

Please let me know if you need any further clarification. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info.

from .contrib import *
from .core import *
from .executors import *
from .jobs import *
from .impersonation import *
from .lineage import *
from .models import *
from .operators import *
from .security import *
from .task import *
from .utils import *
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def test_echo_env_variables(self):
'echo $AIRFLOW_CTX_EXECUTION_DATE>> {0};'
'echo $AIRFLOW_CTX_DAG_RUN_ID>> {0};'.format(fname)
)

original_AIRFLOW_HOME = os.environ['AIRFLOW_HOME']

os.environ['AIRFLOW_HOME'] = 'MY_PATH_TO_AIRFLOW_HOME'
t.run(DEFAULT_DATE, DEFAULT_DATE,
ignore_first_depends_on_past=True, ignore_ti_state=True)
Expand All @@ -78,3 +81,5 @@ def test_echo_env_variables(self):
self.assertIn('echo_env_vars', output)
self.assertIn(DEFAULT_DATE.isoformat(), output)
self.assertIn('manual__' + DEFAULT_DATE.isoformat(), output)

os.environ['AIRFLOW_HOME'] = original_AIRFLOW_HOME
46 changes: 46 additions & 0 deletions tests/operators/operators.py → tests/operators/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def tearDown(self):
for table in drop_tables:
conn.execute("DROP TABLE IF EXISTS {}".format(table))

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_operator_test(self):
sql = """
CREATE TABLE IF NOT EXISTS test_airflow (
Expand All @@ -66,8 +68,11 @@ def test_mysql_operator_test(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_operator_test_multi(self):
sql = [
"CREATE TABLE IF NOT EXISTS test_airflow (dummy VARCHAR(50))",
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
Expand All @@ -79,6 +84,8 @@ def test_mysql_operator_test_multi(self):
)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_hook_test_bulk_load(self):
records = ("foo", "bar", "baz")

Expand All @@ -101,6 +108,8 @@ def test_mysql_hook_test_bulk_load(self):
results = tuple(result[0] for result in c.fetchall())
self.assertEqual(sorted(results), sorted(records))

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_hook_test_bulk_dump(self):
from airflow.hooks.mysql_hook import MySqlHook
hook = MySqlHook('airflow_db')
Expand All @@ -112,6 +121,8 @@ def test_mysql_hook_test_bulk_dump(self):
self.skipTest("Skip test_mysql_hook_test_bulk_load "
"since file output is not permitted")

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
@mock.patch('airflow.hooks.mysql_hook.MySqlHook.get_conn')
def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn):
mock_execute = mock.MagicMock()
Expand All @@ -131,6 +142,8 @@ def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn):
""".format(tmp_file=tmp_file, table=table)
assertEqualIgnoreMultipleSpaces(self, mock_execute.call_args[0][0], query)

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_mysql(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
from airflow.operators.generic_transfer import GenericTransfer
Expand All @@ -148,6 +161,8 @@ def test_mysql_to_mysql(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_overwrite_schema(self):
"""
Verifies option to overwrite connection schema
Expand Down Expand Up @@ -177,6 +192,16 @@ def setUp(self):
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag

def tearDown(self):
tables_to_drop = ['test_postgres_to_postgres', 'test_airflow']
from airflow.hooks.postgres_hook import PostgresHook
with PostgresHook().get_conn() as conn:
with conn.cursor() as cur:
for t in tables_to_drop:
cur.execute("DROP TABLE IF EXISTS {}".format(t))

@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_postgres_operator_test(self):
sql = """
CREATE TABLE IF NOT EXISTS test_airflow (
Expand All @@ -197,8 +222,11 @@ def test_postgres_operator_test(self):
end_date=DEFAULT_DATE,
ignore_ti_state=True)

@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_postgres_operator_test_multi(self):
sql = [
"CREATE TABLE IF NOT EXISTS test_airflow (dummy VARCHAR(50))",
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
Expand All @@ -207,6 +235,8 @@ def test_postgres_operator_test_multi(self):
task_id='postgres_operator_test_multi', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_postgres_to_postgres(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
from airflow.operators.generic_transfer import GenericTransfer
Expand All @@ -224,6 +254,8 @@ def test_postgres_to_postgres(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_vacuum(self):
"""
Verifies the VACUUM operation runs well with the PostgresOperator
Expand All @@ -238,6 +270,8 @@ def test_vacuum(self):
autocommit=True)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_overwrite_schema(self):
"""
Verifies option to overwrite connection schema
Expand Down Expand Up @@ -343,11 +377,15 @@ def tearDown(self):
with MySqlHook().get_conn() as cur:
cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;")

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_clear(self):
self.dag.clear(
start_date=DEFAULT_DATE,
end_date=timezone.utcnow())

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive(self):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
Expand All @@ -361,6 +399,8 @@ def test_mysql_to_hive(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive_partition(self):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
Expand All @@ -376,6 +416,8 @@ def test_mysql_to_hive_partition(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive_tblproperties(self):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
Expand All @@ -390,6 +432,8 @@ def test_mysql_to_hive_tblproperties(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
@mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file')
def test_mysql_to_hive_type_conversion(self, mock_load_file):
mysql_table = 'test_mysql_to_hive'
Expand Down Expand Up @@ -433,6 +477,8 @@ def test_mysql_to_hive_type_conversion(self, mock_load_file):
with m.get_conn() as c:
c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))

@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive_verify_loaded_values(self):
mysql_table = 'test_mysql_to_hive'
hive_table = 'test_mysql_to_hive'
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.