From 5abe374c681d3bbe3ef0f37f00413ae8b4f6fd53 Mon Sep 17 00:00:00 2001 From: Chenglong Yan Date: Wed, 24 Aug 2022 23:25:51 +0800 Subject: [PATCH] Migrate Google example presto_to_gcs to new design AIP-47 related: #22447, #22430 --- .../operators/transfer/presto_to_gcs.rst | 10 +- .../transfers/test_presto_to_gcs_system.py | 158 ------------------ .../cloud/gcs}/example_presto_to_gcs.py | 62 +++++-- 3 files changed, 51 insertions(+), 179 deletions(-) delete mode 100644 tests/providers/google/cloud/transfers/test_presto_to_gcs_system.py rename {airflow/providers/google/cloud/example_dags => tests/system/providers/google/cloud/gcs}/example_presto_to_gcs.py (79%) diff --git a/docs/apache-airflow-providers-google/operators/transfer/presto_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/presto_to_gcs.rst index c462ee9abaa6c..ca64afed2d466 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/presto_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/presto_to_gcs.rst @@ -49,7 +49,7 @@ All parameters are described in the reference documentation - :class:`~airflow.p An example operator call might look like this: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_presto_to_gcs_basic] @@ -67,7 +67,7 @@ You can specify these options by the ``export_format`` parameter. If you want a CSV file to be created, your operator call might look like this: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_presto_to_gcs_csv] @@ -81,7 +81,7 @@ will be dumped from the database and upload to the bucket. If you want to create a schema file, then an example operator call might look like this: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_presto_to_gcs_multiple_types] @@ -102,7 +102,7 @@ maximum allowed file size for a single object. If you want to create 10 MB files, your code might look like this: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_read_data_from_gcs_many_chunks] @@ -123,7 +123,7 @@ For example, if you want to create an external table that allows you to create q read data directly from GCS, then you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator`. Using this operator looks like this: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_create_external_table_multiple_types] diff --git a/tests/providers/google/cloud/transfers/test_presto_to_gcs_system.py b/tests/providers/google/cloud/transfers/test_presto_to_gcs_system.py deleted file mode 100644 index ba0bf8a9428b0..0000000000000 --- a/tests/providers/google/cloud/transfers/test_presto_to_gcs_system.py +++ /dev/null @@ -1,158 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os -from contextlib import closing, suppress - -import pytest - -from airflow.models import Connection -from airflow.providers.presto.hooks.presto import PrestoHook -from airflow.utils.session import create_session -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY, GCP_GCS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - -GCS_BUCKET = os.environ.get("GCP_PRESTO_TO_GCS_BUCKET_NAME", "test-presto-to-gcs-bucket") -DATASET_NAME = os.environ.get("GCP_PRESTO_TO_GCS_DATASET_NAME", "test_presto_to_gcs_dataset") - -CREATE_QUERY = """ -CREATE TABLE memory.default.test_multiple_types ( - -- Boolean - z_boolean BOOLEAN, - -- Integers - z_tinyint TINYINT, - z_smallint SMALLINT, - z_integer INTEGER, - z_bigint BIGINT, - -- Floating-Point - z_real REAL, - z_double DOUBLE, - -- Fixed-Point - z_decimal DECIMAL(10,2), - -- String - z_varchar VARCHAR(20), - z_char CHAR(20), - z_varbinary VARBINARY, - z_json JSON, - -- Date and Time - z_date DATE, - z_time TIME, - z_time_with_time_zone TIME WITH TIME ZONE, - z_timestamp TIMESTAMP, - z_timestamp_with_time_zone TIMESTAMP WITH TIME ZONE, - -- Network Address - z_ipaddress_v4 IPADDRESS, - z_ipaddress_v6 IPADDRESS, - -- UUID - z_uuid UUID -) -""" - -LOAD_QUERY = """ -INSERT INTO memory.default.test_multiple_types VALUES( - -- Boolean - true, -- z_boolean BOOLEAN, - -- Integers - CAST(POW(2, 7 ) - 42 AS TINYINT), -- z_tinyint TINYINT, - CAST(POW(2, 15) - 42 AS SMALLINT), -- z_smallint SMALLINT, - CAST(POW(2, 31) - 42 AS INTEGER), -- z_integer INTEGER, - CAST(POW(2, 32) - 42 AS BIGINT) * 2, -- z_bigint BIGINT, - -- Floating-Point - REAL '42', -- z_real REAL, - DOUBLE '1.03e42', -- z_double DOUBLE, - -- Floating-Point - DECIMAL '1.1', -- z_decimal DECIMAL(10, 2), - -- String - U&'Hello winter \2603 !', -- z_vaarchar VARCHAR(20), - 'cat', -- z_char CHAR(20), - X'65683F', -- z_varbinary VARBINARY, - CAST('["A", 1, true]' AS JSON), -- z_json JSON, - -- Date and Time - DATE '2001-08-22', -- z_date DATE, - TIME '01:02:03.456', -- z_time TIME, - TIME '01:02:03.456 America/Los_Angeles', -- z_time_with_time_zone TIME WITH TIME ZONE, - TIMESTAMP '2001-08-22 03:04:05.321', -- z_timestamp TIMESTAMP, - TIMESTAMP '2001-08-22 03:04:05.321 America/Los_Angeles', -- z_timestamp_with_time_zone TIMESTAMP WITH TIME - -- ZONE, - -- Network Address - IPADDRESS '10.0.0.1', -- z_ipaddress_v4 IPADDRESS, - IPADDRESS '2001:db8::1', -- z_ipaddress_v6 IPADDRESS, - -- UUID - UUID '12151fd2-7586-11e9-8f9e-2a86e4085a59' -- z_uuid UUID -) -""" -DELETE_QUERY = "DROP TABLE memory.default.test_multiple_types" - - -@pytest.mark.integration("presto") -class PrestoToGCSSystemTest(GoogleSystemTest): - @staticmethod - def init_connection(): - with create_session() as session: - session.query(Connection).filter(Connection.conn_id == "presto_default").delete() - session.merge( - Connection( - conn_id="presto_default", conn_type="conn_type", host="presto", port=8080, login="airflow" - ) - ) - - @staticmethod - def init_db(): - hook = PrestoHook() - with hook.get_conn() as conn: - with closing(conn.cursor()) as cur: - cur.execute(CREATE_QUERY) - # Presto does not execute queries until the result is fetched. :-( - cur.fetchone() - cur.execute(LOAD_QUERY) - cur.fetchone() - - @staticmethod - def drop_db(): - hook = PrestoHook() - with hook.get_conn() as conn: - with closing(conn.cursor()) as cur: - cur.execute(DELETE_QUERY) - # Presto does not execute queries until the result is fetched. :-( - cur.fetchone() - - @provide_gcp_context(GCP_GCS_KEY) - def setUp(self): - super().setUp() - self.init_connection() - self.create_gcs_bucket(GCS_BUCKET) - with suppress(Exception): - self.drop_db() - self.init_db() - self.execute_with_ctx( - ["bq", "rm", "--recursive", "--force", f"{self._project_id()}:{DATASET_NAME}"], - key=GCP_BIGQUERY_KEY, - ) - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def test_run_example_dag(self): - self.run_dag("example_presto_to_gcs", CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_GCS_KEY) - def tearDown(self): - self.delete_gcs_bucket(GCS_BUCKET) - self.drop_db() - self.execute_with_ctx( - ["bq", "rm", "--recursive", "--force", f"{self._project_id()}:{DATASET_NAME}"], - key=GCP_BIGQUERY_KEY, - ) - super().tearDown() diff --git a/airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py b/tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py similarity index 79% rename from airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py rename to tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py index 2d939f02843aa..87a4cd9e15676 100644 --- a/airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py +++ b/tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py @@ -29,11 +29,15 @@ BigQueryDeleteDatasetOperator, BigQueryInsertJobOperator, ) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.presto_to_gcs import PrestoToGCSOperator +from airflow.utils.trigger_rule import TriggerRule +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_presto_to_gcs" GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", 'example-project') -GCS_BUCKET = os.environ.get("GCP_PRESTO_TO_GCS_BUCKET_NAME", "INVALID BUCKET NAME") -DATASET_NAME = os.environ.get("GCP_PRESTO_TO_GCS_DATASET_NAME", "test_presto_to_gcs_dataset") +GCS_BUCKET = f"bucket_{DAG_ID}_{ENV_ID}" +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" SOURCE_MULTIPLE_TYPES = "memory.default.test_multiple_types" SOURCE_CUSTOMER_TABLE = "tpch.sf1.customer" @@ -47,16 +51,26 @@ def safe_name(s: str) -> str: with models.DAG( - dag_id="example_presto_to_gcs", + dag_id=DAG_ID, start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], + tags=["example", "presto"], ) as dag: - create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME) delete_dataset = BigQueryDeleteDatasetOperator( - task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True + task_id="delete_dataset", + dataset_id=DATASET_NAME, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=GCS_BUCKET, project_id=GCP_PROJECT_ID + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=GCS_BUCKET, trigger_rule=TriggerRule.ALL_DONE ) # [START howto_operator_presto_to_gcs_basic] @@ -178,15 +192,31 @@ def safe_name(s: str) -> str: ) # [END howto_operator_presto_to_gcs_csv] - create_dataset >> presto_to_gcs_basic - create_dataset >> presto_to_gcs_multiple_types - create_dataset >> presto_to_gcs_many_chunks - create_dataset >> presto_to_gcs_csv + ( + # TEST SETUP + create_dataset + >> create_bucket + # TEST BODY + >> presto_to_gcs_basic + >> presto_to_gcs_multiple_types + >> create_external_table_multiple_types + >> read_data_from_gcs_multiple_types + >> presto_to_gcs_many_chunks + >> create_external_table_many_chunks + >> read_data_from_gcs_many_chunks + >> presto_to_gcs_csv + # TEST TEARDOWN + >> delete_dataset + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() - presto_to_gcs_multiple_types >> create_external_table_multiple_types >> read_data_from_gcs_multiple_types - presto_to_gcs_many_chunks >> create_external_table_many_chunks >> read_data_from_gcs_many_chunks +from tests.system.utils import get_test_run # noqa: E402 - presto_to_gcs_basic >> delete_dataset - presto_to_gcs_csv >> delete_dataset - read_data_from_gcs_multiple_types >> delete_dataset - read_data_from_gcs_many_chunks >> delete_dataset +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)