From 68a1eb40c3bfb70ddeffb4dcb435d0deae007fac Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Wed, 27 Apr 2022 16:58:03 -0400 Subject: [PATCH] feat: Allow forecasting timestamp split. --- google/cloud/aiplatform/training_jobs.py | 39 ++++++-- .../test_automl_forecasting_training_jobs.py | 91 +++++++++++++++++++ 2 files changed, 123 insertions(+), 7 deletions(-) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 667d4a558f..2dbd130555 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -408,7 +408,8 @@ def _create_input_data_config( that piece is ignored by the pipeline. Supported only for tabular and time series Datasets. - This parameter must be used with training_fraction_split, validation_fraction_split and test_fraction_split. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. gcs_destination_uri_prefix (str): Optional. The Google Cloud Storage location. @@ -669,7 +670,8 @@ def _run_job( that piece is ignored by the pipeline. Supported only for tabular and time series Datasets. - This parameter must be used with training_fraction_split, validation_fraction_split and test_fraction_split. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. model (~.model.Model): Optional. Describes the Model that may be uploaded (via [ModelService.UploadMode][]) by this TrainingPipeline. The @@ -3487,9 +3489,9 @@ def run( `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a piece of data the key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular and time series Datasets. - This parameter must be used with training_fraction_split, validation_fraction_split and test_fraction_split. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. weight_column (str): Optional. Name of the column that should be used as the weight column. Higher values in this column give more importance to the row @@ -3681,9 +3683,9 @@ def _run( `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a piece of data the key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular and time series Datasets. - This parameter must be used with training_fraction_split, validation_fraction_split and test_fraction_split. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. weight_column (str): Optional. Name of the column that should be used as the weight column. Higher values in this column give more importance to the row @@ -4022,6 +4024,7 @@ def run( validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, predefined_split_column_name: Optional[str] = None, + timestamp_split_column_name: Optional[str] = None, weight_column: Optional[str] = None, time_series_attribute_columns: Optional[List[str]] = None, context_window: Optional[int] = None, @@ -4106,6 +4109,16 @@ def run( ignored by the pipeline. Supported only for tabular and time series Datasets. + timestamp_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key values of the key (the values in + the column) must be in RFC 3339 `date-time` format, where + `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a + piece of data the key is not present or has an invalid value, + that piece is ignored by the pipeline. + Supported only for tabular and time series Datasets. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. weight_column (str): Optional. Name of the column that should be used as the weight column. Higher values in this column give more importance to the row @@ -4229,6 +4242,7 @@ def run( validation_fraction_split=validation_fraction_split, test_fraction_split=test_fraction_split, predefined_split_column_name=predefined_split_column_name, + timestamp_split_column_name=timestamp_split_column_name, weight_column=weight_column, time_series_attribute_columns=time_series_attribute_columns, context_window=context_window, @@ -4260,6 +4274,7 @@ def _run( validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, predefined_split_column_name: Optional[str] = None, + timestamp_split_column_name: Optional[str] = None, weight_column: Optional[str] = None, time_series_attribute_columns: Optional[List[str]] = None, context_window: Optional[int] = None, @@ -4352,6 +4367,16 @@ def _run( ignored by the pipeline. Supported only for tabular and time series Datasets. + timestamp_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key values of the key (the values in + the column) must be in RFC 3339 `date-time` format, where + `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a + piece of data the key is not present or has an invalid value, + that piece is ignored by the pipeline. + Supported only for tabular and time series Datasets. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. weight_column (str): Optional. Name of the column that should be used as the weight column. Higher values in this column give more importance to the row @@ -4511,7 +4536,7 @@ def _run( validation_fraction_split=validation_fraction_split, test_fraction_split=test_fraction_split, predefined_split_column_name=predefined_split_column_name, - timestamp_split_column_name=None, # Not supported by AutoMLForecasting + timestamp_split_column_name=timestamp_split_column_name, model=model, create_request_timeout=create_request_timeout, ) diff --git a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py index b04fb0e84f..6a96d656e8 100644 --- a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py +++ b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py @@ -148,6 +148,7 @@ _TEST_FRACTION_SPLIT_TEST = 0.2 _TEST_SPLIT_PREDEFINED_COLUMN_NAME = "split" +_TEST_SPLIT_TIMESTAMP_COLUMN_NAME = "timestamp" @pytest.fixture @@ -768,6 +769,96 @@ def test_splits_fraction( timeout=None, ) + @pytest.mark.parametrize("sync", [True, False]) + def test_splits_timestamp( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + mock_dataset_time_series, + mock_model_service_get, + sync, + ): + """Initiate aiplatform with encryption key name. + + Create and run an AutoML Forecasting training job, verify calls and + return value + """ + + aiplatform.init( + project=_TEST_PROJECT, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + job = AutoMLForecastingTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + ) + + model_from_job = job.run( + dataset=mock_dataset_time_series, + training_fraction_split=_TEST_FRACTION_SPLIT_TRAINING, + validation_fraction_split=_TEST_FRACTION_SPLIT_VALIDATION, + test_fraction_split=_TEST_FRACTION_SPLIT_TEST, + timestamp_split_column_name=_TEST_SPLIT_TIMESTAMP_COLUMN_NAME, + target_column=_TEST_TRAINING_TARGET_COLUMN, + time_column=_TEST_TRAINING_TIME_COLUMN, + time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN, + unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS, + available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS, + forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON, + data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT, + data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS, + context_window=_TEST_TRAINING_CONTEXT_WINDOW, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS, + export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + quantiles=_TEST_TRAINING_QUANTILES, + validation_options=_TEST_TRAINING_VALIDATION_OPTIONS, + sync=sync, + create_request_timeout=None, + ) + + if not sync: + model_from_job.wait() + + true_split = gca_training_pipeline.TimestampSplit( + training_fraction=_TEST_FRACTION_SPLIT_TRAINING, + validation_fraction=_TEST_FRACTION_SPLIT_VALIDATION, + test_fraction=_TEST_FRACTION_SPLIT_TEST, + key=_TEST_SPLIT_TIMESTAMP_COLUMN_NAME, + ) + + true_managed_model = gca_model.Model( + display_name=_TEST_MODEL_DISPLAY_NAME, + encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC, + ) + + true_input_data_config = gca_training_pipeline.InputDataConfig( + timestamp_split=true_split, dataset_id=mock_dataset_time_series.name + ) + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=( + schema.training_job.definition.automl_forecasting + ), + training_task_inputs=_TEST_TRAINING_TASK_INPUTS, + model_to_upload=true_managed_model, + input_data_config=true_input_data_config, + encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + timeout=None, + ) + @pytest.mark.parametrize("sync", [True, False]) def test_splits_predefined( self,