Skip to content

Commit

Permalink
feat(dms_endpoint): add DataFormat, EncryptionMode, and encryption km…
Browse files Browse the repository at this point in the history
…s arn to the S3 settings block
  • Loading branch information
doransmestad committed Jul 22, 2020
1 parent 2f23a59 commit ed5e4cc
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 23 deletions.
126 changes: 104 additions & 22 deletions aws/resource_aws_dms_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func resourceAwsDmsEndpoint() *schema.Resource {
State: schema.ImportStatePassthrough,
},

Timeouts: &schema.ResourceTimeout{
Delete: schema.DefaultTimeout(10 * time.Minute),
},

Schema: map[string]*schema.Schema{
"certificate_arn": {
Type: schema.TypeString,
Expand Down Expand Up @@ -299,6 +303,32 @@ func resourceAwsDmsEndpoint() *schema.Resource {
Optional: true,
Default: "NONE",
},
"data_format": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{
dms.DataFormatValueCsv,
dms.DataFormatValueParquet,
}, false),
},
"encryption_mode": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{
// required to hard-code strings here as the aws-go-sdk uses the
// different strings here (sse-s3 vs SSE_S3) and it gets rejected by the AWS API
"SSE_S3", //dms.EncryptionModeValueSseS3,
"SSE_KMS", //dms.EncryptionModeValueSseKms,
}, false),
// API returns this error with ModifyEndpoint:
// InvalidParameterCombinationException: Only SSE_S3 encryption mode supported
// ^ This message is a bit misleading as SSE_KMS is supported, but requires a new endpoint
ForceNew: true,
},
"server_side_encryption_kms_key_id": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
Expand Down Expand Up @@ -389,13 +419,16 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro
request.DatabaseName = aws.String(d.Get("database_name").(string))
case "s3":
request.S3Settings = &dms.S3Settings{
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)),
EncryptionMode: aws.String(d.Get("s3_settings.0.encryption_mode").(string)),
ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)),
}
default:
request.Password = aws.String(d.Get("password").(string))
Expand Down Expand Up @@ -619,18 +652,23 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
hasChanges = true
}
case "s3":
// Purposely excluded s3_settings.0.encryption_mode here as the AWS API disallows setting encryption_mode to SSE_KMS
// even if the endpoint is already configured for it. As such, changing that attribute requires a delete+create.
if d.HasChanges(
"s3_settings.0.service_access_role_arn", "s3_settings.0.external_table_definition",
"s3_settings.0.csv_row_delimiter", "s3_settings.0.csv_delimiter", "s3_settings.0.bucket_folder",
"s3_settings.0.bucket_name", "s3_settings.0.compression_type") {
"s3_settings.0.bucket_name", "s3_settings.0.compression_type", "s3_settings.0.data_format",
"s3_settings.0.server_side_encryption_kms_key_id") {
request.S3Settings = &dms.S3Settings{
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)),
ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)),
}
request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3')
hasChanges = true
Expand Down Expand Up @@ -686,9 +724,50 @@ func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) erro
log.Printf("[DEBUG] DMS delete endpoint: %#v", request)

_, err := conn.DeleteEndpoint(request)
waitUntilAwsDmsEndpointIsDeleted(d.Id(), conn, d.Timeout(schema.TimeoutDelete))
return err
}

func waitUntilAwsDmsEndpointIsDeleted(endpointId string, conn *dms.DatabaseMigrationService, timeout time.Duration) error {
stateConf := &resource.StateChangeConf{
Pending: []string{"active", "deleting"}, // Did not find constants in the SDK for these, determined by experimentation.
Target: []string{},
Refresh: waitUntilAwsDmsEndpointIsDeletedRefresh(endpointId, conn),
Timeout: timeout,
MinTimeout: 10 * time.Second,
Delay: 30 * time.Second, // Wait 30 secs before starting
}
_, err := stateConf.WaitForState()
return err
}

func waitUntilAwsDmsEndpointIsDeletedRefresh(endpointId string, conn *dms.DatabaseMigrationService) resource.StateRefreshFunc {
return func() (interface{}, string, error) {

// query AWS for the status of the endpoint by endpointId.
response, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{
Filters: []*dms.Filter{
{
Name: aws.String("endpoint-id"),
Values: []*string{aws.String(endpointId)},
},
},
})

// if there is an error - that may mean the endpoint was removed and thus we can
// indicate the deletion was successful (by passing an empty status back to the caller)
if err != nil {
if isAWSErr(err, "ResourceNotFoundFault", "") {
return nil, "", nil // success, the endpoint is removed
}
log.Printf("Error on retrieving DB Instance when waiting: %s", err)
return nil, "", err
}

return response, aws.StringValue(response.Endpoints[0].Status), nil
}
}

func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) error {
d.SetId(*endpoint.EndpointIdentifier)

Expand Down Expand Up @@ -816,13 +895,16 @@ func flattenDmsS3Settings(settings *dms.S3Settings) []map[string]interface{} {
}

m := map[string]interface{}{
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"external_table_definition": aws.StringValue(settings.ExternalTableDefinition),
"csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter),
"csv_delimiter": aws.StringValue(settings.CsvDelimiter),
"bucket_folder": aws.StringValue(settings.BucketFolder),
"bucket_name": aws.StringValue(settings.BucketName),
"compression_type": aws.StringValue(settings.CompressionType),
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"external_table_definition": aws.StringValue(settings.ExternalTableDefinition),
"csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter),
"csv_delimiter": aws.StringValue(settings.CsvDelimiter),
"bucket_folder": aws.StringValue(settings.BucketFolder),
"bucket_name": aws.StringValue(settings.BucketName),
"compression_type": aws.StringValue(settings.CompressionType),
"data_format": aws.StringValue(settings.DataFormat),
"encryption_mode": aws.StringValue(settings.EncryptionMode),
"server_side_encryption_kms_key_id": aws.StringValue(settings.ServerSideEncryptionKmsKeyId),
}

return []map[string]interface{}{m}
Expand Down
174 changes: 173 additions & 1 deletion aws/resource_aws_dms_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) {
CheckDestroy: dmsEndpointDestroy,
Steps: []resource.TestStep{
{
Config: dmsEndpointS3Config(randId),
Config: composeConfig(dmsEndpointS3Config(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
Expand Down Expand Up @@ -96,6 +96,74 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) {
})
}

// NOTE because of the change from s3_settings.0.encryption_mode from SSE_S3 to SSE_KMS this test will invoke a
// forced destruction and recreation of the resource. Only add to this test case if you're okay with that (i.e. NOT an update)
func TestAccAwsDmsEndpoint_S3Encrypted(t *testing.T) {
resourceName := "aws_dms_endpoint.dms_endpoint"
randId := acctest.RandString(8) + "-s3"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: dmsEndpointDestroy,
Steps: []resource.TestStep{
{
Config: composeConfig(dmsEndpointS3ConfigEncrypted(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\n"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", ","),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "bucket_name"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "NONE"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_S3"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"password"},
},
{
Config: composeConfig(dmsEndpointS3ConfigEncryptedUpdate(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\r"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", "."),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", "new-bucket_folder"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "new-bucket_name"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "GZIP"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "parquet"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_KMS"),
),
},
{
Config: composeConfig(dmsEndpointS3ConfigEncryptedUpdateMaintainKms(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\r"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", "."),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", "new-bucket_folder"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "new-bucket_name"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "parquet"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_KMS"),
),
},
},
})
}

func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) {
resourceName := "aws_dms_endpoint.dms_endpoint"
randId := acctest.RandString(8) + "-dynamodb"
Expand Down Expand Up @@ -684,6 +752,106 @@ EOF
`, randId)
}

func dmsEndpointS3ConfigEncrypted(randId string) string {
return fmt.Sprintf(`
resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "target"
engine_name = "s3"
ssl_mode = "none"
extra_connection_attributes = ""
tags = {
Name = "tf-test-s3-endpoint-%[1]s"
Update = "to-update"
Remove = "to-remove"
}
s3_settings {
service_access_role_arn = "${aws_iam_role.iam_role.arn}"
bucket_name = "bucket_name"
encryption_mode = "SSE_S3"
}
depends_on = ["aws_iam_role_policy.dms_s3_access"]
}
`, randId)
}

func dmsEndpointS3ConfigEncryptedUpdate(randId string) string {
return fmt.Sprintf(`
data "aws_kms_alias" "dms" {
name = "alias/aws/dms"
}
resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "target"
engine_name = "s3"
ssl_mode = "none"
extra_connection_attributes = "key=value;"
tags = {
Name = "tf-test-s3-endpoint-%[1]s"
Update = "updated"
Add = "added"
}
s3_settings {
service_access_role_arn = "${aws_iam_role.iam_role.arn}"
external_table_definition = "new-external_table_definition"
csv_row_delimiter = "\\r"
csv_delimiter = "."
bucket_folder = "new-bucket_folder"
bucket_name = "new-bucket_name"
compression_type = "GZIP"
data_format = "parquet"
encryption_mode = "SSE_KMS"
server_side_encryption_kms_key_id = "${data.aws_kms_alias.dms.target_key_arn}"
}
}
`, randId)
}

// Designed to test the situation when an udpate needs to occur to the s3_settings block
// but NOT the encryption_mode attribute. This tests we can update s3_settings without angering the AWS API
// which refuses any modification with encryption_mode=SSE_KMS (even if it was set before) unless encryption_mode is
// omitted from the modify request. We set here by removing GZIP compression.
func dmsEndpointS3ConfigEncryptedUpdateMaintainKms(randId string) string {
return fmt.Sprintf(`
data "aws_kms_alias" "dms" {
name = "alias/aws/dms"
}
resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "target"
engine_name = "s3"
ssl_mode = "none"
extra_connection_attributes = "key=value;"
tags = {
Name = "tf-test-s3-endpoint-%[1]s"
Update = "updated"
Add = "added"
}
s3_settings {
service_access_role_arn = "${aws_iam_role.iam_role.arn}"
external_table_definition = "new-external_table_definition"
csv_row_delimiter = "\\r"
csv_delimiter = "."
bucket_folder = "new-bucket_folder"
bucket_name = "new-bucket_name"
compression_type = ""
data_format = "parquet"
encryption_mode = "SSE_KMS"
server_side_encryption_kms_key_id = "${data.aws_kms_alias.dms.target_key_arn}"
}
}
`, randId)
}

func dmsEndpointS3Config(randId string) string {
return fmt.Sprintf(`
resource "aws_dms_endpoint" "dms_endpoint" {
Expand All @@ -706,7 +874,11 @@ resource "aws_dms_endpoint" "dms_endpoint" {
depends_on = ["aws_iam_role_policy.dms_s3_access"]
}
`, randId)
}

func dmsEndpointS3ConfigBase(randId string) string {
return fmt.Sprintf(`
resource "aws_iam_role" "iam_role" {
name = "tf-test-iam-s3-role-%[1]s"
Expand Down
10 changes: 10 additions & 0 deletions website/docs/r/dms_endpoint.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ The `s3_settings` configuration block supports the following arguments:
* `csv_row_delimiter` - (Optional) Delimiter used to separate rows in the source files. Defaults to `\n`.
* `external_table_definition` - (Optional) JSON document that describes how AWS DMS should interpret the data.
* `service_access_role_arn` - (Optional) Amazon Resource Name (ARN) of the IAM Role with permissions to read from or write to the S3 Bucket.
* `data_format` - (Optional) Specify the data format. Valid values are `csv` and `parquet`.
* `encryption_mode` - (Optional) Set to encrypt target files. Valid vaules are `SSE_S3` and `SSE_KMS`.
* `server_side_encryption_kms_key_id` - (Optional) When using `encryption_mode` as `SSE_KMS`, provide the arn for the KMS key to use.

## Timeouts

`aws_dms_endpoint` provides the following
[Timeouts](/docs/configuration/resources.html#timeouts) configuration options:

- `delete` - (Default `10 minutes`) Used for destroying endpoints.

## Attributes Reference

Expand Down

0 comments on commit ed5e4cc

Please sign in to comment.