Skip to content

Commit 9b0cd00

Browse files
authored
Merge pull request #27616 from kristofmartens/b_aws_appflow_flow_s3_output_add_preserve_source_data_typing
Add the option to preserve the source data type for S3 output format…
2 parents 4f68756 + 07201e9 commit 9b0cd00

File tree

4 files changed

+133
-0
lines changed

4 files changed

+133
-0
lines changed

.changelog/27616.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
resource/aws_appflow_flow: Add attribute `preserve_source_data_typing` to `s3_output_format_config` in `s3`
3+
```

internal/service/appflow/flow.go

+12
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,10 @@ func ResourceFlow() *schema.Resource {
384384
},
385385
},
386386
},
387+
"preserve_source_data_typing": {
388+
Type: schema.TypeBool,
389+
Optional: true,
390+
},
387391
},
388392
},
389393
},
@@ -1726,6 +1730,10 @@ func expandS3OutputFormatConfig(tfMap map[string]interface{}) *appflow.S3OutputF
17261730
a.PrefixConfig = expandPrefixConfig(v[0].(map[string]interface{}))
17271731
}
17281732

1733+
if v, ok := tfMap["preserve_source_data_typing"].(bool); ok {
1734+
a.PreserveSourceDataTyping = aws.Bool(v)
1735+
}
1736+
17291737
return a
17301738
}
17311739

@@ -2830,6 +2838,10 @@ func flattenS3OutputFormatConfig(s3OutputFormatConfig *appflow.S3OutputFormatCon
28302838
m["prefix_config"] = []interface{}{flattenPrefixConfig(v)}
28312839
}
28322840

2841+
if v := s3OutputFormatConfig.PreserveSourceDataTyping; v != nil {
2842+
m["preserve_source_data_typing"] = aws.BoolValue(v)
2843+
}
2844+
28332845
return m
28342846
}
28352847

internal/service/appflow/flow_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,53 @@ func TestAccAppFlowFlow_basic(t *testing.T) {
6868
})
6969
}
7070

71+
func TestAccAppFlowFlow_S3_outputFormatConfig_ParquetFileType(t *testing.T) {
72+
ctx := acctest.Context(t)
73+
var flowOutput appflow.FlowDefinition
74+
rSourceName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
75+
rDestinationName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
76+
rFlowName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
77+
resourceName := "aws_appflow_flow.test"
78+
scheduleStartTime := time.Now().UTC().AddDate(0, 0, 1).Format(time.RFC3339)
79+
80+
resource.ParallelTest(t, resource.TestCase{
81+
PreCheck: func() { acctest.PreCheck(t) },
82+
ErrorCheck: acctest.ErrorCheck(t, appflow.EndpointsID),
83+
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
84+
CheckDestroy: testAccCheckFlowDestroy(ctx),
85+
Steps: []resource.TestStep{
86+
{
87+
Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", true),
88+
Check: resource.ComposeAggregateTestCheckFunc(
89+
testAccCheckFlowExists(ctx, resourceName, &flowOutput),
90+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"),
91+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"),
92+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"),
93+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "true"),
94+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"),
95+
resource.TestCheckResourceAttrSet(resourceName, "task.#"),
96+
resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"),
97+
resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"),
98+
),
99+
},
100+
{
101+
Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", false),
102+
Check: resource.ComposeAggregateTestCheckFunc(
103+
testAccCheckFlowExists(ctx, resourceName, &flowOutput),
104+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"),
105+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"),
106+
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"),
107+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "false"),
108+
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"),
109+
resource.TestCheckResourceAttrSet(resourceName, "task.#"),
110+
resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"),
111+
resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"),
112+
),
113+
},
114+
},
115+
})
116+
}
117+
71118
func TestAccAppFlowFlow_update(t *testing.T) {
72119
ctx := acctest.Context(t)
73120
var flowOutput appflow.FlowDefinition
@@ -344,6 +391,76 @@ resource "aws_appflow_flow" "test" {
344391
)
345392
}
346393

394+
func testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, fileType string, preserveSourceDataTyping bool) string {
395+
return acctest.ConfigCompose(
396+
testAccFlowConfig_base(rSourceName, rDestinationName),
397+
fmt.Sprintf(`
398+
resource "aws_appflow_flow" "test" {
399+
name = %[1]q
400+
401+
source_flow_config {
402+
connector_type = "S3"
403+
source_connector_properties {
404+
s3 {
405+
bucket_name = aws_s3_bucket_policy.test_source.bucket
406+
bucket_prefix = "flow"
407+
}
408+
}
409+
}
410+
411+
destination_flow_config {
412+
connector_type = "S3"
413+
destination_connector_properties {
414+
s3 {
415+
bucket_name = aws_s3_bucket_policy.test_destination.bucket
416+
417+
s3_output_format_config {
418+
prefix_config {
419+
prefix_type = "PATH"
420+
}
421+
422+
file_type = %[3]q
423+
preserve_source_data_typing = %[4]t
424+
425+
aggregation_config {
426+
aggregation_type = "None"
427+
}
428+
}
429+
}
430+
}
431+
}
432+
433+
task {
434+
source_fields = ["testField"]
435+
destination_field = "testField"
436+
task_type = "Map"
437+
438+
task_properties = {
439+
"DESTINATION_DATA_TYPE" = "string"
440+
"SOURCE_DATA_TYPE" = "string"
441+
}
442+
443+
connector_operator {
444+
s3 = "NO_OP"
445+
}
446+
}
447+
448+
trigger_config {
449+
trigger_type = "Scheduled"
450+
451+
trigger_properties {
452+
scheduled {
453+
data_pull_mode = "Incremental"
454+
schedule_expression = "rate(3hours)"
455+
schedule_start_time = %[2]q
456+
}
457+
}
458+
}
459+
}
460+
`, rFlowName, scheduleStartTime, fileType, preserveSourceDataTyping),
461+
)
462+
}
463+
347464
func testAccFlowConfig_update(rSourceName string, rDestinationName string, rFlowName string, description string) string {
348465
return acctest.ConfigCompose(
349466
testAccFlowConfig_base(rSourceName, rDestinationName),

website/docs/r/appflow_flow.html.markdown

+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ EventBridge, Honeycode, and Marketo destination properties all support the follo
201201
* `aggregation_config` - (Optional) Aggregation settings that you can use to customize the output format of your flow data. See [Aggregation Config](#aggregation-config) for more details.
202202
* `file_type` - (Optional) File type that Amazon AppFlow places in the Amazon S3 bucket. Valid values are `CSV`, `JSON`, and `PARQUET`.
203203
* `prefix_config` - (Optional) Determines the prefix that Amazon AppFlow applies to the folder name in the Amazon S3 bucket. You can name folders according to the flow frequency and date. See [Prefix Config](#prefix-config) for more details.
204+
* `preserve_source_data_typing` - (Optional, Boolean) Whether the data types from the source system need to be preserved (Only valid for `Parquet` file type)
204205

205206
##### Salesforce Destination Properties
206207

0 commit comments

Comments
 (0)