Skip to content

Commit 07201e9

Browse files
committed
r/aws_appflow_flow: add test
1 parent 21dbb94 commit 07201e9

File tree

1 file changed

+117
-0
lines changed

1 file changed

+117
-0
lines changed

internal/service/appflow/flow_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,53 @@ func TestAccAppFlowFlow_basic(t *testing.T) {
6868
})
6969
}
7070

71+
func TestAccAppFlowFlow_S3_outputFormatConfig_ParquetFileType(t *testing.T) {
72+
ctx := acctest.Context(t)
73+
var flowOutput appflow.FlowDefinition
74+
rSourceName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
75+
rDestinationName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
76+
rFlowName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
77+
resourceName := "aws_appflow_flow.test"
78+
scheduleStartTime := time.Now().UTC().AddDate(0, 0, 1).Format(time.RFC3339)
79+
80+
resource.ParallelTest(t, resource.TestCase{
81+
PreCheck: func() { acctest.PreCheck(t) },
82+
ErrorCheck: acctest.ErrorCheck(t, appflow.EndpointsID),
83+
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
84+
CheckDestroy: testAccCheckFlowDestroy(ctx),
85+
Steps: []resource.TestStep{
86+
{
87+
Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", true),
88+
Check: resource.ComposeAggregateTestCheckFunc(
89+
testAccCheckFlowExists(ctx, resourceName, &flowOutput),
90+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"),
91+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"),
92+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"),
93+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "true"),
94+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"),
95+
resource.TestCheckResourceAttrSet(resourceName, "task.#"),
96+
resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"),
97+
resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"),
98+
),
99+
},
100+
{
101+
Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", false),
102+
Check: resource.ComposeAggregateTestCheckFunc(
103+
testAccCheckFlowExists(ctx, resourceName, &flowOutput),
104+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"),
105+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"),
106+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"),
107+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "false"),
108+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"),
109+
resource.TestCheckResourceAttrSet(resourceName, "task.#"),
110+
resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"),
111+
resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"),
112+
),
113+
},
114+
},
115+
})
116+
}
117+
71118
func TestAccAppFlowFlow_update(t *testing.T) {
72119
ctx := acctest.Context(t)
73120
var flowOutput appflow.FlowDefinition
@@ -344,6 +391,76 @@ resource "aws_appflow_flow" "test" {
344391
)
345392
}
346393

394+
func testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, fileType string, preserveSourceDataTyping bool) string {
395+
return acctest.ConfigCompose(
396+
testAccFlowConfig_base(rSourceName, rDestinationName),
397+
fmt.Sprintf(`
398+
resource "aws_appflow_flow" "test" {
399+
name = %[1]q
400+
401+
source_flow_config {
402+
connector_type = "S3"
403+
source_connector_properties {
404+
s3 {
405+
bucket_name = aws_s3_bucket_policy.test_source.bucket
406+
bucket_prefix = "flow"
407+
}
408+
}
409+
}
410+
411+
destination_flow_config {
412+
connector_type = "S3"
413+
destination_connector_properties {
414+
s3 {
415+
bucket_name = aws_s3_bucket_policy.test_destination.bucket
416+
417+
s3_output_format_config {
418+
prefix_config {
419+
prefix_type = "PATH"
420+
}
421+
422+
file_type = %[3]q
423+
preserve_source_data_typing = %[4]t
424+
425+
aggregation_config {
426+
aggregation_type = "None"
427+
}
428+
}
429+
}
430+
}
431+
}
432+
433+
task {
434+
source_fields = ["testField"]
435+
destination_field = "testField"
436+
task_type = "Map"
437+
438+
task_properties = {
439+
"DESTINATION_DATA_TYPE" = "string"
440+
"SOURCE_DATA_TYPE" = "string"
441+
}
442+
443+
connector_operator {
444+
s3 = "NO_OP"
445+
}
446+
}
447+
448+
trigger_config {
449+
trigger_type = "Scheduled"
450+
451+
trigger_properties {
452+
scheduled {
453+
data_pull_mode = "Incremental"
454+
schedule_expression = "rate(3hours)"
455+
schedule_start_time = %[2]q
456+
}
457+
}
458+
}
459+
}
460+
`, rFlowName, scheduleStartTime, fileType, preserveSourceDataTyping),
461+
)
462+
}
463+
347464
func testAccFlowConfig_update(rSourceName string, rDestinationName string, rFlowName string, description string) string {
348465
return acctest.ConfigCompose(
349466
testAccFlowConfig_base(rSourceName, rDestinationName),

0 commit comments

Comments
 (0)