diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index cc67a6d57895..84794bc595e5 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -26,6 +26,10 @@ func resourceAwsDmsEndpoint() *schema.Resource { State: schema.ImportStatePassthrough, }, + Timeouts: &schema.ResourceTimeout{ + Delete: schema.DefaultTimeout(10 * time.Minute), + }, + Schema: map[string]*schema.Schema{ "certificate_arn": { Type: schema.TypeString, @@ -299,6 +303,32 @@ func resourceAwsDmsEndpoint() *schema.Resource { Optional: true, Default: "NONE", }, + "data_format": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringInSlice([]string{ + dms.DataFormatValueCsv, + dms.DataFormatValueParquet, + }, false), + }, + "encryption_mode": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringInSlice([]string{ + // required to hard-code strings here as the aws-go-sdk uses the + // different strings here (sse-s3 vs SSE_S3) and it gets rejected by the AWS API + "SSE_S3", //dms.EncryptionModeValueSseS3, + "SSE_KMS", //dms.EncryptionModeValueSseKms, + }, false), + // API returns this error with ModifyEndpoint: + // InvalidParameterCombinationException: Only SSE_S3 encryption mode supported + // ^ This message is a bit misleading as SSE_KMS is supported, but requires a new endpoint + ForceNew: true, + }, + "server_side_encryption_kms_key_id": { + Type: schema.TypeString, + Optional: true, + }, }, }, }, @@ -389,13 +419,16 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro request.DatabaseName = aws.String(d.Get("database_name").(string)) case "s3": request.S3Settings = &dms.S3Settings{ - ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), - ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)), - CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)), - CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)), - BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), - BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), - CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)), + ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), + ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)), + CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)), + CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)), + BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), + BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), + CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)), + DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)), + EncryptionMode: aws.String(d.Get("s3_settings.0.encryption_mode").(string)), + ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)), } default: request.Password = aws.String(d.Get("password").(string)) @@ -619,18 +652,23 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro hasChanges = true } case "s3": + // Purposely excluded s3_settings.0.encryption_mode here as the AWS API disallows setting encryption_mode to SSE_KMS + // even if the endpoint is already configured for it. As such, changing that attribute requires a delete+create. if d.HasChanges( "s3_settings.0.service_access_role_arn", "s3_settings.0.external_table_definition", "s3_settings.0.csv_row_delimiter", "s3_settings.0.csv_delimiter", "s3_settings.0.bucket_folder", - "s3_settings.0.bucket_name", "s3_settings.0.compression_type") { + "s3_settings.0.bucket_name", "s3_settings.0.compression_type", "s3_settings.0.data_format", + "s3_settings.0.server_side_encryption_kms_key_id") { request.S3Settings = &dms.S3Settings{ - ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), - ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)), - CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)), - CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)), - BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), - BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), - CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)), + ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), + ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)), + CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)), + CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)), + BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), + BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), + CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)), + DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)), + ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)), } request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3') hasChanges = true @@ -686,9 +724,54 @@ func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) erro log.Printf("[DEBUG] DMS delete endpoint: %#v", request) _, err := conn.DeleteEndpoint(request) + if err != nil { + return err + } + + err = waitUntilAwsDmsEndpointIsDeleted(d.Id(), conn, d.Timeout(schema.TimeoutDelete)) return err } +func waitUntilAwsDmsEndpointIsDeleted(endpointId string, conn *dms.DatabaseMigrationService, timeout time.Duration) error { + stateConf := &resource.StateChangeConf{ + Pending: []string{"active", "deleting"}, // Did not find constants in the SDK for these, determined by experimentation. + Target: []string{}, + Refresh: waitUntilAwsDmsEndpointIsDeletedRefresh(endpointId, conn), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, // Wait 30 secs before starting + } + _, err := stateConf.WaitForState() + return err +} + +func waitUntilAwsDmsEndpointIsDeletedRefresh(endpointId string, conn *dms.DatabaseMigrationService) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + + // query AWS for the status of the endpoint by endpointId. + response, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{ + Filters: []*dms.Filter{ + { + Name: aws.String("endpoint-id"), + Values: []*string{aws.String(endpointId)}, + }, + }, + }) + + // if there is an error - that may mean the endpoint was removed and thus we can + // indicate the deletion was successful (by passing an empty status back to the caller) + if err != nil { + if isAWSErr(err, "ResourceNotFoundFault", "") { + return nil, "", nil // success, the endpoint is removed + } + log.Printf("Error on retrieving DB Instance when waiting: %s", err) + return nil, "", err + } + + return response, aws.StringValue(response.Endpoints[0].Status), nil + } +} + func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) error { d.SetId(*endpoint.EndpointIdentifier) @@ -816,13 +899,16 @@ func flattenDmsS3Settings(settings *dms.S3Settings) []map[string]interface{} { } m := map[string]interface{}{ - "service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn), - "external_table_definition": aws.StringValue(settings.ExternalTableDefinition), - "csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter), - "csv_delimiter": aws.StringValue(settings.CsvDelimiter), - "bucket_folder": aws.StringValue(settings.BucketFolder), - "bucket_name": aws.StringValue(settings.BucketName), - "compression_type": aws.StringValue(settings.CompressionType), + "service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn), + "external_table_definition": aws.StringValue(settings.ExternalTableDefinition), + "csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter), + "csv_delimiter": aws.StringValue(settings.CsvDelimiter), + "bucket_folder": aws.StringValue(settings.BucketFolder), + "bucket_name": aws.StringValue(settings.BucketName), + "compression_type": aws.StringValue(settings.CompressionType), + "data_format": aws.StringValue(settings.DataFormat), + "encryption_mode": aws.StringValue(settings.EncryptionMode), + "server_side_encryption_kms_key_id": aws.StringValue(settings.ServerSideEncryptionKmsKeyId), } return []map[string]interface{}{m} diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index 7b498110c0fb..74466b5a0741 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -60,7 +60,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { CheckDestroy: dmsEndpointDestroy, Steps: []resource.TestStep{ { - Config: dmsEndpointS3Config(randId), + Config: composeConfig(dmsEndpointS3Config(randId), dmsEndpointS3ConfigBase(randId)), Check: resource.ComposeTestCheckFunc( checkDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), @@ -96,6 +96,74 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { }) } +// NOTE because of the change from s3_settings.0.encryption_mode from SSE_S3 to SSE_KMS this test will invoke a +// forced destruction and recreation of the resource. Only add to this test case if you're okay with that (i.e. NOT an update) +func TestAccAwsDmsEndpoint_S3Encrypted(t *testing.T) { + resourceName := "aws_dms_endpoint.dms_endpoint" + randId := acctest.RandString(8) + "-s3" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: dmsEndpointDestroy, + Steps: []resource.TestStep{ + { + Config: composeConfig(dmsEndpointS3ConfigEncrypted(randId), dmsEndpointS3ConfigBase(randId)), + Check: resource.ComposeTestCheckFunc( + checkDmsEndpointExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", ""), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\n"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", ","), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", ""), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "bucket_name"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "NONE"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", ""), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_S3"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"password"}, + }, + { + Config: composeConfig(dmsEndpointS3ConfigEncryptedUpdate(randId), dmsEndpointS3ConfigBase(randId)), + Check: resource.ComposeTestCheckFunc( + checkDmsEndpointExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\r"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", "."), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", "new-bucket_folder"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "new-bucket_name"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "GZIP"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "parquet"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_KMS"), + ), + }, + { + Config: composeConfig(dmsEndpointS3ConfigEncryptedUpdateMaintainKms(randId), dmsEndpointS3ConfigBase(randId)), + Check: resource.ComposeTestCheckFunc( + checkDmsEndpointExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\r"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", "."), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", "new-bucket_folder"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "new-bucket_name"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", ""), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "parquet"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_KMS"), + ), + }, + }, + }) +} + func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { resourceName := "aws_dms_endpoint.dms_endpoint" randId := acctest.RandString(8) + "-dynamodb" @@ -692,6 +760,112 @@ EOF `, randId) } +func dmsEndpointS3ConfigEncrypted(randId string) string { + return fmt.Sprintf(` +data "aws_partition" "current" {} + +resource "aws_dms_endpoint" "dms_endpoint" { + endpoint_id = "tf-test-dms-endpoint-%[1]s" + endpoint_type = "target" + engine_name = "s3" + ssl_mode = "none" + extra_connection_attributes = "" + + tags = { + Name = "tf-test-s3-endpoint-%[1]s" + Update = "to-update" + Remove = "to-remove" + } + + s3_settings { + service_access_role_arn = "${aws_iam_role.iam_role.arn}" + bucket_name = "bucket_name" + encryption_mode = "SSE_S3" + } + + depends_on = ["aws_iam_role_policy.dms_s3_access"] +} +`, randId) +} + +func dmsEndpointS3ConfigEncryptedUpdate(randId string) string { + return fmt.Sprintf(` +data "aws_partition" "current" {} + +data "aws_kms_alias" "dms" { + name = "alias/aws/dms" +} + +resource "aws_dms_endpoint" "dms_endpoint" { + endpoint_id = "tf-test-dms-endpoint-%[1]s" + endpoint_type = "target" + engine_name = "s3" + ssl_mode = "none" + extra_connection_attributes = "key=value;" + + tags = { + Name = "tf-test-s3-endpoint-%[1]s" + Update = "updated" + Add = "added" + } + + s3_settings { + service_access_role_arn = "${aws_iam_role.iam_role.arn}" + external_table_definition = "new-external_table_definition" + csv_row_delimiter = "\\r" + csv_delimiter = "." + bucket_folder = "new-bucket_folder" + bucket_name = "new-bucket_name" + compression_type = "GZIP" + data_format = "parquet" + encryption_mode = "SSE_KMS" + server_side_encryption_kms_key_id = "${data.aws_kms_alias.dms.target_key_arn}" + } +} +`, randId) +} + +// Designed to test the situation when an udpate needs to occur to the s3_settings block +// but NOT the encryption_mode attribute. This tests we can update s3_settings without angering the AWS API +// which refuses any modification with encryption_mode=SSE_KMS (even if it was set before) unless encryption_mode is +// omitted from the modify request. We set here by removing GZIP compression. +func dmsEndpointS3ConfigEncryptedUpdateMaintainKms(randId string) string { + return fmt.Sprintf(` +data "aws_partition" "current" {} + +data "aws_kms_alias" "dms" { + name = "alias/aws/dms" +} + +resource "aws_dms_endpoint" "dms_endpoint" { + endpoint_id = "tf-test-dms-endpoint-%[1]s" + endpoint_type = "target" + engine_name = "s3" + ssl_mode = "none" + extra_connection_attributes = "key=value;" + + tags = { + Name = "tf-test-s3-endpoint-%[1]s" + Update = "updated" + Add = "added" + } + + s3_settings { + service_access_role_arn = "${aws_iam_role.iam_role.arn}" + external_table_definition = "new-external_table_definition" + csv_row_delimiter = "\\r" + csv_delimiter = "." + bucket_folder = "new-bucket_folder" + bucket_name = "new-bucket_name" + compression_type = "" + data_format = "parquet" + encryption_mode = "SSE_KMS" + server_side_encryption_kms_key_id = "${data.aws_kms_alias.dms.target_key_arn}" + } +} +`, randId) +} + func dmsEndpointS3Config(randId string) string { return fmt.Sprintf(` data "aws_partition" "current" {} @@ -716,7 +890,11 @@ resource "aws_dms_endpoint" "dms_endpoint" { depends_on = [aws_iam_role_policy.dms_s3_access] } +`, randId) +} +func dmsEndpointS3ConfigBase(randId string) string { + return fmt.Sprintf(` resource "aws_iam_role" "iam_role" { name = "tf-test-iam-s3-role-%[1]s" diff --git a/website/docs/r/dms_endpoint.html.markdown b/website/docs/r/dms_endpoint.html.markdown index 308719e4e3a1..23965dec5e69 100644 --- a/website/docs/r/dms_endpoint.html.markdown +++ b/website/docs/r/dms_endpoint.html.markdown @@ -125,6 +125,16 @@ The `s3_settings` configuration block supports the following arguments: * `csv_row_delimiter` - (Optional) Delimiter used to separate rows in the source files. Defaults to `\n`. * `external_table_definition` - (Optional) JSON document that describes how AWS DMS should interpret the data. * `service_access_role_arn` - (Optional) Amazon Resource Name (ARN) of the IAM Role with permissions to read from or write to the S3 Bucket. +* `data_format` - (Optional) Specify the data format. Valid values are `csv` and `parquet`. +* `encryption_mode` - (Optional) Set to encrypt target files. Valid vaules are `SSE_S3` and `SSE_KMS`. +* `server_side_encryption_kms_key_id` - (Optional) When using `encryption_mode` as `SSE_KMS`, provide the arn for the KMS key to use. + +## Timeouts + +`aws_dms_endpoint` provides the following +[Timeouts](/docs/configuration/resources.html#timeouts) configuration options: + +- `delete` - (Default `10 minutes`) Used for destroying endpoints. ## Attributes Reference