Skip to content

Commit 0923108

Browse files
authored
Merge pull request #34487 from hashicorp/b-pipes_pipe_source_parameters
r/aws_pipes_pipe: fix zero value vs `null` value on update
2 parents e60d085 + c2047ab commit 0923108

File tree

3 files changed

+90
-9
lines changed

3 files changed

+90
-9
lines changed

.changelog/34487.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
resource/aws_pipes_pipe: Fix error when zero value is sent to `source_parameters` on update
3+
```

internal/service/pipes/pipe_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,54 @@ func TestAccPipesPipe_kinesisSourceAndTarget(t *testing.T) {
708708
ImportState: true,
709709
ImportStateVerify: true,
710710
},
711+
{
712+
Config: testAccPipeConfig_updateKinesis(rName, 10),
713+
Check: resource.ComposeAggregateTestCheckFunc(
714+
testAccCheckPipeExists(ctx, resourceName, &pipe),
715+
acctest.MatchResourceAttrRegionalARN(resourceName, "arn", "pipes", regexache.MustCompile(regexp.QuoteMeta(`pipe/`+rName))),
716+
resource.TestCheckResourceAttr(resourceName, "description", "Managed by Terraform"),
717+
resource.TestCheckResourceAttr(resourceName, "desired_state", "RUNNING"),
718+
resource.TestCheckResourceAttr(resourceName, "enrichment", ""),
719+
resource.TestCheckResourceAttr(resourceName, "enrichment_parameters.#", "0"),
720+
resource.TestCheckResourceAttr(resourceName, "name", rName),
721+
resource.TestCheckResourceAttrPair(resourceName, "role_arn", "aws_iam_role.test", "arn"),
722+
resource.TestCheckResourceAttrPair(resourceName, "source", "aws_kinesis_stream.source", "arn"),
723+
resource.TestCheckResourceAttr(resourceName, "source_parameters.#", "1"),
724+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.activemq_broker_parameters.#", "0"),
725+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.dynamodb_stream_parameters.#", "0"),
726+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.filter_criteria.#", "0"),
727+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.#", "1"),
728+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.batch_size", "10"),
729+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.dead_letter_config.#", "0"),
730+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.maximum_batching_window_in_seconds", "0"),
731+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.maximum_record_age_in_seconds", "0"),
732+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.maximum_retry_attempts", "0"),
733+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.on_partial_batch_item_failure", ""),
734+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.parallelization_factor", "0"),
735+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.starting_position", "LATEST"),
736+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.kinesis_stream_parameters.0.starting_position_timestamp", ""),
737+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.managed_streaming_kafka_parameters.#", "0"),
738+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.rabbitmq_broker_parameters.#", "0"),
739+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.self_managed_kafka_parameters.#", "0"),
740+
resource.TestCheckResourceAttr(resourceName, "source_parameters.0.sqs_queue_parameters.#", "0"),
741+
resource.TestCheckResourceAttr(resourceName, "tags.%", "0"),
742+
resource.TestCheckResourceAttrPair(resourceName, "target", "aws_kinesis_stream.target", "arn"),
743+
resource.TestCheckResourceAttr(resourceName, "target_parameters.#", "1"),
744+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.batch_job_parameters.#", "0"),
745+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.cloudwatch_logs_parameters.#", "0"),
746+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.ecs_task_parameters.#", "0"),
747+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.eventbridge_event_bus_parameters.#", "0"),
748+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.http_parameters.#", "0"),
749+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.input_template", ""),
750+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.kinesis_stream_parameters.#", "1"),
751+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.kinesis_stream_parameters.0.partition_key", "test"),
752+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.lambda_function_parameters.#", "0"),
753+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.redshift_data_parameters.#", "0"),
754+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.sagemaker_pipeline_parameters.#", "0"),
755+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.sqs_queue_parameters.#", "0"),
756+
resource.TestCheckResourceAttr(resourceName, "target_parameters.0.step_function_state_machine_parameters.#", "0"),
757+
),
758+
},
711759
},
712760
})
713761
}
@@ -2111,6 +2159,36 @@ resource "aws_pipes_pipe" "test" {
21112159
`, rName))
21122160
}
21132161

2162+
func testAccPipeConfig_updateKinesis(rName string, batchSize int) string {
2163+
return acctest.ConfigCompose(
2164+
testAccPipeConfig_base(rName),
2165+
testAccPipeConfig_baseKinesisSource(rName),
2166+
testAccPipeConfig_baseKinesisTarget(rName),
2167+
fmt.Sprintf(`
2168+
resource "aws_pipes_pipe" "test" {
2169+
depends_on = [aws_iam_role_policy.source, aws_iam_role_policy.target]
2170+
2171+
name = %[1]q
2172+
role_arn = aws_iam_role.test.arn
2173+
source = aws_kinesis_stream.source.arn
2174+
target = aws_kinesis_stream.target.arn
2175+
2176+
source_parameters {
2177+
kinesis_stream_parameters {
2178+
batch_size = %[2]d
2179+
starting_position = "LATEST"
2180+
}
2181+
}
2182+
2183+
target_parameters {
2184+
kinesis_stream_parameters {
2185+
partition_key = "test"
2186+
}
2187+
}
2188+
}
2189+
`, rName, batchSize))
2190+
}
2191+
21142192
func testAccPipeConfig_basicDynamoDBSourceCloudWatchLogsTarget(rName string) string {
21152193
return acctest.ConfigCompose(
21162194
testAccPipeConfig_base(rName),

internal/service/pipes/source_parameters.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,7 @@ func expandUpdatePipeSourceActiveMQBrokerParameters(tfMap map[string]interface{}
735735

736736
apiObject := &types.UpdatePipeSourceActiveMQBrokerParameters{}
737737

738-
if v, ok := tfMap["batch_size"].(int); ok {
738+
if v, ok := tfMap["batch_size"].(int); ok && v != 0 {
739739
apiObject.BatchSize = aws.Int32(int32(v))
740740
}
741741

@@ -815,7 +815,7 @@ func expandUpdatePipeSourceDynamoDBStreamParameters(tfMap map[string]interface{}
815815

816816
apiObject := &types.UpdatePipeSourceDynamoDBStreamParameters{}
817817

818-
if v, ok := tfMap["batch_size"].(int); ok {
818+
if v, ok := tfMap["batch_size"].(int); ok && v != 0 {
819819
apiObject.BatchSize = aws.Int32(int32(v))
820820
}
821821

@@ -841,7 +841,7 @@ func expandUpdatePipeSourceDynamoDBStreamParameters(tfMap map[string]interface{}
841841
apiObject.OnPartialBatchItemFailure = types.OnPartialBatchItemFailureStreams(v)
842842
}
843843

844-
if v, ok := tfMap["parallelization_factor"].(int); ok {
844+
if v, ok := tfMap["parallelization_factor"].(int); ok && v != 0 {
845845
apiObject.ParallelizationFactor = aws.Int32(int32(v))
846846
}
847847

@@ -917,7 +917,7 @@ func expandUpdatePipeSourceKinesisStreamParameters(tfMap map[string]interface{})
917917

918918
apiObject := &types.UpdatePipeSourceKinesisStreamParameters{}
919919

920-
if v, ok := tfMap["batch_size"].(int); ok {
920+
if v, ok := tfMap["batch_size"].(int); ok && v != 0 {
921921
apiObject.BatchSize = aws.Int32(int32(v))
922922
}
923923

@@ -931,7 +931,7 @@ func expandUpdatePipeSourceKinesisStreamParameters(tfMap map[string]interface{})
931931
apiObject.MaximumBatchingWindowInSeconds = aws.Int32(int32(v))
932932
}
933933

934-
if v, ok := tfMap["maximum_record_age_in_seconds"].(int); ok {
934+
if v, ok := tfMap["maximum_record_age_in_seconds"].(int); ok && v != 0 {
935935
apiObject.MaximumRecordAgeInSeconds = aws.Int32(int32(v))
936936
}
937937

@@ -943,7 +943,7 @@ func expandUpdatePipeSourceKinesisStreamParameters(tfMap map[string]interface{})
943943
apiObject.OnPartialBatchItemFailure = types.OnPartialBatchItemFailureStreams(v)
944944
}
945945

946-
if v, ok := tfMap["parallelization_factor"].(int); ok {
946+
if v, ok := tfMap["parallelization_factor"].(int); ok && v != 0 {
947947
apiObject.ParallelizationFactor = aws.Int32(int32(v))
948948
}
949949

@@ -991,7 +991,7 @@ func expandUpdatePipeSourceManagedStreamingKafkaParameters(tfMap map[string]inte
991991

992992
apiObject := &types.UpdatePipeSourceManagedStreamingKafkaParameters{}
993993

994-
if v, ok := tfMap["batch_size"].(int); ok {
994+
if v, ok := tfMap["batch_size"].(int); ok && v != 0 {
995995
apiObject.BatchSize = aws.Int32(int32(v))
996996
}
997997

@@ -1067,7 +1067,7 @@ func expandUpdatePipeSourceRabbitMQBrokerParameters(tfMap map[string]interface{}
10671067

10681068
apiObject := &types.UpdatePipeSourceRabbitMQBrokerParameters{}
10691069

1070-
if v, ok := tfMap["batch_size"].(int); ok {
1070+
if v, ok := tfMap["batch_size"].(int); ok && v != 0 {
10711071
apiObject.BatchSize = aws.Int32(int32(v))
10721072
}
10731073

@@ -1243,7 +1243,7 @@ func expandUpdatePipeSourceSQSQueueParameters(tfMap map[string]interface{}) *typ
12431243

12441244
apiObject := &types.UpdatePipeSourceSqsQueueParameters{}
12451245

1246-
if v, ok := tfMap["batch_size"].(int); ok {
1246+
if v, ok := tfMap["batch_size"].(int); ok && v != 0 {
12471247
apiObject.BatchSize = aws.Int32(int32(v))
12481248
}
12491249

0 commit comments

Comments
 (0)