Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dms_endpoint): add DataFormat, EncryptionMode, and KmsKeyId to the S3 settings block #14340

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 108 additions & 22 deletions aws/resource_aws_dms_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func resourceAwsDmsEndpoint() *schema.Resource {
State: schema.ImportStatePassthrough,
},

Timeouts: &schema.ResourceTimeout{
Delete: schema.DefaultTimeout(10 * time.Minute),
},

Schema: map[string]*schema.Schema{
"certificate_arn": {
Type: schema.TypeString,
Expand Down Expand Up @@ -299,6 +303,32 @@ func resourceAwsDmsEndpoint() *schema.Resource {
Optional: true,
Default: "NONE",
},
"data_format": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{
dms.DataFormatValueCsv,
dms.DataFormatValueParquet,
}, false),
},
"encryption_mode": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{
// required to hard-code strings here as the aws-go-sdk uses the
// different strings here (sse-s3 vs SSE_S3) and it gets rejected by the AWS API
"SSE_S3", //dms.EncryptionModeValueSseS3,
"SSE_KMS", //dms.EncryptionModeValueSseKms,
}, false),
// API returns this error with ModifyEndpoint:
// InvalidParameterCombinationException: Only SSE_S3 encryption mode supported
// ^ This message is a bit misleading as SSE_KMS is supported, but requires a new endpoint
ForceNew: true,
},
"server_side_encryption_kms_key_id": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
Expand Down Expand Up @@ -389,13 +419,16 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro
request.DatabaseName = aws.String(d.Get("database_name").(string))
case "s3":
request.S3Settings = &dms.S3Settings{
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)),
EncryptionMode: aws.String(d.Get("s3_settings.0.encryption_mode").(string)),
ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)),
}
default:
request.Password = aws.String(d.Get("password").(string))
Expand Down Expand Up @@ -619,18 +652,23 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
hasChanges = true
}
case "s3":
// Purposely excluded s3_settings.0.encryption_mode here as the AWS API disallows setting encryption_mode to SSE_KMS
// even if the endpoint is already configured for it. As such, changing that attribute requires a delete+create.
if d.HasChanges(
"s3_settings.0.service_access_role_arn", "s3_settings.0.external_table_definition",
"s3_settings.0.csv_row_delimiter", "s3_settings.0.csv_delimiter", "s3_settings.0.bucket_folder",
"s3_settings.0.bucket_name", "s3_settings.0.compression_type") {
"s3_settings.0.bucket_name", "s3_settings.0.compression_type", "s3_settings.0.data_format",
"s3_settings.0.server_side_encryption_kms_key_id") {
request.S3Settings = &dms.S3Settings{
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)),
ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)),
}
request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3')
hasChanges = true
Expand Down Expand Up @@ -686,9 +724,54 @@ func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) erro
log.Printf("[DEBUG] DMS delete endpoint: %#v", request)

_, err := conn.DeleteEndpoint(request)
if err != nil {
return err
}

err = waitUntilAwsDmsEndpointIsDeleted(d.Id(), conn, d.Timeout(schema.TimeoutDelete))
return err
}

func waitUntilAwsDmsEndpointIsDeleted(endpointId string, conn *dms.DatabaseMigrationService, timeout time.Duration) error {
stateConf := &resource.StateChangeConf{
Pending: []string{"active", "deleting"}, // Did not find constants in the SDK for these, determined by experimentation.
Target: []string{},
Refresh: waitUntilAwsDmsEndpointIsDeletedRefresh(endpointId, conn),
Timeout: timeout,
MinTimeout: 10 * time.Second,
Delay: 30 * time.Second, // Wait 30 secs before starting
}
_, err := stateConf.WaitForState()
return err
}

func waitUntilAwsDmsEndpointIsDeletedRefresh(endpointId string, conn *dms.DatabaseMigrationService) resource.StateRefreshFunc {
return func() (interface{}, string, error) {

// query AWS for the status of the endpoint by endpointId.
response, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{
Filters: []*dms.Filter{
{
Name: aws.String("endpoint-id"),
Values: []*string{aws.String(endpointId)},
},
},
})

// if there is an error - that may mean the endpoint was removed and thus we can
// indicate the deletion was successful (by passing an empty status back to the caller)
if err != nil {
if isAWSErr(err, "ResourceNotFoundFault", "") {
return nil, "", nil // success, the endpoint is removed
}
log.Printf("Error on retrieving DB Instance when waiting: %s", err)
return nil, "", err
}

return response, aws.StringValue(response.Endpoints[0].Status), nil
}
}

func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) error {
d.SetId(*endpoint.EndpointIdentifier)

Expand Down Expand Up @@ -816,13 +899,16 @@ func flattenDmsS3Settings(settings *dms.S3Settings) []map[string]interface{} {
}

m := map[string]interface{}{
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"external_table_definition": aws.StringValue(settings.ExternalTableDefinition),
"csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter),
"csv_delimiter": aws.StringValue(settings.CsvDelimiter),
"bucket_folder": aws.StringValue(settings.BucketFolder),
"bucket_name": aws.StringValue(settings.BucketName),
"compression_type": aws.StringValue(settings.CompressionType),
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"external_table_definition": aws.StringValue(settings.ExternalTableDefinition),
"csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter),
"csv_delimiter": aws.StringValue(settings.CsvDelimiter),
"bucket_folder": aws.StringValue(settings.BucketFolder),
"bucket_name": aws.StringValue(settings.BucketName),
"compression_type": aws.StringValue(settings.CompressionType),
"data_format": aws.StringValue(settings.DataFormat),
"encryption_mode": aws.StringValue(settings.EncryptionMode),
"server_side_encryption_kms_key_id": aws.StringValue(settings.ServerSideEncryptionKmsKeyId),
}

return []map[string]interface{}{m}
Expand Down
180 changes: 179 additions & 1 deletion aws/resource_aws_dms_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) {
CheckDestroy: dmsEndpointDestroy,
Steps: []resource.TestStep{
{
Config: dmsEndpointS3Config(randId),
Config: composeConfig(dmsEndpointS3Config(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
Expand Down Expand Up @@ -96,6 +96,74 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) {
})
}

// NOTE because of the change from s3_settings.0.encryption_mode from SSE_S3 to SSE_KMS this test will invoke a
// forced destruction and recreation of the resource. Only add to this test case if you're okay with that (i.e. NOT an update)
func TestAccAwsDmsEndpoint_S3Encrypted(t *testing.T) {
resourceName := "aws_dms_endpoint.dms_endpoint"
randId := acctest.RandString(8) + "-s3"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: dmsEndpointDestroy,
Steps: []resource.TestStep{
{
Config: composeConfig(dmsEndpointS3ConfigEncrypted(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\n"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", ","),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "bucket_name"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "NONE"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_S3"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"password"},
},
{
Config: composeConfig(dmsEndpointS3ConfigEncryptedUpdate(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\r"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", "."),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", "new-bucket_folder"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "new-bucket_name"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "GZIP"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "parquet"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_KMS"),
),
},
{
Config: composeConfig(dmsEndpointS3ConfigEncryptedUpdateMaintainKms(randId), dmsEndpointS3ConfigBase(randId)),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\r"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_delimiter", "."),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", "new-bucket_folder"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "new-bucket_name"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "parquet"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_KMS"),
),
},
},
})
}

func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) {
resourceName := "aws_dms_endpoint.dms_endpoint"
randId := acctest.RandString(8) + "-dynamodb"
Expand Down Expand Up @@ -692,6 +760,112 @@ EOF
`, randId)
}

func dmsEndpointS3ConfigEncrypted(randId string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}

resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "target"
engine_name = "s3"
ssl_mode = "none"
extra_connection_attributes = ""

tags = {
Name = "tf-test-s3-endpoint-%[1]s"
Update = "to-update"
Remove = "to-remove"
}

s3_settings {
service_access_role_arn = "${aws_iam_role.iam_role.arn}"
bucket_name = "bucket_name"
encryption_mode = "SSE_S3"
}

depends_on = ["aws_iam_role_policy.dms_s3_access"]
}
`, randId)
}

func dmsEndpointS3ConfigEncryptedUpdate(randId string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}

data "aws_kms_alias" "dms" {
name = "alias/aws/dms"
}

resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "target"
engine_name = "s3"
ssl_mode = "none"
extra_connection_attributes = "key=value;"

tags = {
Name = "tf-test-s3-endpoint-%[1]s"
Update = "updated"
Add = "added"
}

s3_settings {
service_access_role_arn = "${aws_iam_role.iam_role.arn}"
external_table_definition = "new-external_table_definition"
csv_row_delimiter = "\\r"
csv_delimiter = "."
bucket_folder = "new-bucket_folder"
bucket_name = "new-bucket_name"
compression_type = "GZIP"
data_format = "parquet"
encryption_mode = "SSE_KMS"
server_side_encryption_kms_key_id = "${data.aws_kms_alias.dms.target_key_arn}"
}
}
`, randId)
}

// Designed to test the situation when an udpate needs to occur to the s3_settings block
// but NOT the encryption_mode attribute. This tests we can update s3_settings without angering the AWS API
// which refuses any modification with encryption_mode=SSE_KMS (even if it was set before) unless encryption_mode is
// omitted from the modify request. We set here by removing GZIP compression.
func dmsEndpointS3ConfigEncryptedUpdateMaintainKms(randId string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}

data "aws_kms_alias" "dms" {
name = "alias/aws/dms"
}

resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "target"
engine_name = "s3"
ssl_mode = "none"
extra_connection_attributes = "key=value;"

tags = {
Name = "tf-test-s3-endpoint-%[1]s"
Update = "updated"
Add = "added"
}

s3_settings {
service_access_role_arn = "${aws_iam_role.iam_role.arn}"
external_table_definition = "new-external_table_definition"
csv_row_delimiter = "\\r"
csv_delimiter = "."
bucket_folder = "new-bucket_folder"
bucket_name = "new-bucket_name"
compression_type = ""
data_format = "parquet"
encryption_mode = "SSE_KMS"
server_side_encryption_kms_key_id = "${data.aws_kms_alias.dms.target_key_arn}"
}
}
`, randId)
}

func dmsEndpointS3Config(randId string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}
Expand All @@ -716,7 +890,11 @@ resource "aws_dms_endpoint" "dms_endpoint" {

depends_on = [aws_iam_role_policy.dms_s3_access]
}
`, randId)
}

func dmsEndpointS3ConfigBase(randId string) string {
return fmt.Sprintf(`
resource "aws_iam_role" "iam_role" {
name = "tf-test-iam-s3-role-%[1]s"

Expand Down
Loading