Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Google example presto_to_gcs to new design AIP-47 #25941

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ All parameters are described in the reference documentation - :class:`~airflow.p

An example operator call might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_presto_to_gcs_basic]
Expand All @@ -67,7 +67,7 @@ You can specify these options by the ``export_format`` parameter.

If you want a CSV file to be created, your operator call might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_presto_to_gcs_csv]
Expand All @@ -81,7 +81,7 @@ will be dumped from the database and upload to the bucket.

If you want to create a schema file, then an example operator call might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_presto_to_gcs_multiple_types]
Expand All @@ -102,7 +102,7 @@ maximum allowed file size for a single object.

If you want to create 10 MB files, your code might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_read_data_from_gcs_many_chunks]
Expand All @@ -123,7 +123,7 @@ For example, if you want to create an external table that allows you to create q
read data directly from GCS, then you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator`.
Using this operator looks like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_presto_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_external_table_multiple_types]
Expand Down
158 changes: 0 additions & 158 deletions tests/providers/google/cloud/transfers/test_presto_to_gcs_system.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@
BigQueryDeleteDatasetOperator,
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.presto_to_gcs import PrestoToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_presto_to_gcs"
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", 'example-project')
GCS_BUCKET = os.environ.get("GCP_PRESTO_TO_GCS_BUCKET_NAME", "INVALID BUCKET NAME")
DATASET_NAME = os.environ.get("GCP_PRESTO_TO_GCS_DATASET_NAME", "test_presto_to_gcs_dataset")
GCS_BUCKET = f"bucket_{DAG_ID}_{ENV_ID}"
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"

SOURCE_MULTIPLE_TYPES = "memory.default.test_multiple_types"
SOURCE_CUSTOMER_TABLE = "tpch.sf1.customer"
Expand All @@ -47,16 +51,26 @@ def safe_name(s: str) -> str:


with models.DAG(
dag_id="example_presto_to_gcs",
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "presto"],
) as dag:

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
task_id="delete_dataset",
dataset_id=DATASET_NAME,
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)

create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=GCS_BUCKET, project_id=GCP_PROJECT_ID
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=GCS_BUCKET, trigger_rule=TriggerRule.ALL_DONE
)

# [START howto_operator_presto_to_gcs_basic]
Expand Down Expand Up @@ -178,15 +192,31 @@ def safe_name(s: str) -> str:
)
# [END howto_operator_presto_to_gcs_csv]

create_dataset >> presto_to_gcs_basic
create_dataset >> presto_to_gcs_multiple_types
create_dataset >> presto_to_gcs_many_chunks
create_dataset >> presto_to_gcs_csv
(
# TEST SETUP
create_dataset
>> create_bucket
# TEST BODY
>> presto_to_gcs_basic
>> presto_to_gcs_multiple_types
>> create_external_table_multiple_types
>> read_data_from_gcs_multiple_types
>> presto_to_gcs_many_chunks
>> create_external_table_many_chunks
>> read_data_from_gcs_many_chunks
>> presto_to_gcs_csv
# TEST TEARDOWN
>> delete_dataset
>> delete_bucket
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

presto_to_gcs_multiple_types >> create_external_table_multiple_types >> read_data_from_gcs_multiple_types
presto_to_gcs_many_chunks >> create_external_table_many_chunks >> read_data_from_gcs_many_chunks
from tests.system.utils import get_test_run # noqa: E402

presto_to_gcs_basic >> delete_dataset
presto_to_gcs_csv >> delete_dataset
read_data_from_gcs_multiple_types >> delete_dataset
read_data_from_gcs_many_chunks >> delete_dataset
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)