Skip to content

Commit 526591f

Browse files
authored
Merge pull request #25684 from moremagic/f-aws_msk_cluster-serverless_cluster_type
Add support for AWS MSK serverless cluster type
2 parents fe4d7b0 + 6691496 commit 526591f

15 files changed

+902
-79
lines changed

.changelog/25684.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:new-resource
2+
aws_msk_serverless_cluster
3+
```

internal/provider/provider.go

+1
Original file line numberDiff line numberDiff line change
@@ -1620,6 +1620,7 @@ func New(_ context.Context) (*schema.Provider, error) {
16201620
"aws_msk_cluster": kafka.ResourceCluster(),
16211621
"aws_msk_configuration": kafka.ResourceConfiguration(),
16221622
"aws_msk_scram_secret_association": kafka.ResourceScramSecretAssociation(),
1623+
"aws_msk_serverless_cluster": kafka.ResourceServerlessCluster(),
16231624

16241625
"aws_mskconnect_connector": kafkaconnect.ResourceConnector(),
16251626
"aws_mskconnect_custom_plugin": kafkaconnect.ResourceCustomPlugin(),

internal/service/kafka/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int
925925
if d.HasChange("tags_all") {
926926
o, n := d.GetChange("tags_all")
927927

928-
if err := UpdateTags(conn, d.Id(), o, n); err != nil {
928+
if err := UpdateTagsWithContext(ctx, conn, d.Id(), o, n); err != nil {
929929
return diag.Errorf("updating MSK Cluster (%s) tags: %s", d.Id(), err)
930930
}
931931
}

internal/service/kafka/cluster_test.go

+71-48
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,77 @@ func TestAccKafkaCluster_basic(t *testing.T) {
110110
})
111111
}
112112

113+
func TestAccKafkaCluster_disappears(t *testing.T) {
114+
var cluster kafka.ClusterInfo
115+
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
116+
resourceName := "aws_msk_cluster.test"
117+
118+
resource.ParallelTest(t, resource.TestCase{
119+
PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) },
120+
ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID),
121+
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
122+
CheckDestroy: testAccCheckClusterDestroy,
123+
Steps: []resource.TestStep{
124+
{
125+
Config: testAccClusterConfig_basic(rName),
126+
Check: resource.ComposeAggregateTestCheckFunc(
127+
testAccCheckClusterExists(resourceName, &cluster),
128+
acctest.CheckResourceDisappears(acctest.Provider, tfkafka.ResourceCluster(), resourceName),
129+
),
130+
ExpectNonEmptyPlan: true,
131+
},
132+
},
133+
})
134+
}
135+
136+
func TestAccKafkaCluster_tags(t *testing.T) {
137+
var cluster kafka.ClusterInfo
138+
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
139+
resourceName := "aws_msk_cluster.test"
140+
141+
resource.ParallelTest(t, resource.TestCase{
142+
PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) },
143+
ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID),
144+
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
145+
CheckDestroy: testAccCheckClusterDestroy,
146+
Steps: []resource.TestStep{
147+
{
148+
Config: testAccClusterConfig_tags1(rName, "key1", "value1"),
149+
Check: resource.ComposeAggregateTestCheckFunc(
150+
testAccCheckClusterExists(resourceName, &cluster),
151+
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
152+
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"),
153+
),
154+
},
155+
{
156+
ResourceName: resourceName,
157+
ImportState: true,
158+
ImportStateVerify: true,
159+
ImportStateVerifyIgnore: []string{
160+
"current_version",
161+
},
162+
},
163+
{
164+
Config: testAccClusterConfig_tags2(rName, "key1", "value1updated", "key2", "value2"),
165+
Check: resource.ComposeAggregateTestCheckFunc(
166+
testAccCheckClusterExists(resourceName, &cluster),
167+
resource.TestCheckResourceAttr(resourceName, "tags.%", "2"),
168+
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"),
169+
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
170+
),
171+
},
172+
{
173+
Config: testAccClusterConfig_tags1(rName, "key2", "value2"),
174+
Check: resource.ComposeAggregateTestCheckFunc(
175+
testAccCheckClusterExists(resourceName, &cluster),
176+
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
177+
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
178+
),
179+
},
180+
},
181+
})
182+
}
183+
113184
func TestAccKafkaCluster_BrokerNodeGroupInfo_ebsVolumeSize(t *testing.T) {
114185
var cluster1, cluster2 kafka.ClusterInfo
115186
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
@@ -1090,54 +1161,6 @@ func TestAccKafkaCluster_kafkaVersionUpgradeWithInfo(t *testing.T) {
10901161
})
10911162
}
10921163

1093-
func TestAccKafkaCluster_tags(t *testing.T) {
1094-
var cluster kafka.ClusterInfo
1095-
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
1096-
resourceName := "aws_msk_cluster.test"
1097-
1098-
resource.ParallelTest(t, resource.TestCase{
1099-
PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) },
1100-
ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID),
1101-
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
1102-
CheckDestroy: testAccCheckClusterDestroy,
1103-
Steps: []resource.TestStep{
1104-
{
1105-
Config: testAccClusterConfig_tags1(rName, "key1", "value1"),
1106-
Check: resource.ComposeAggregateTestCheckFunc(
1107-
testAccCheckClusterExists(resourceName, &cluster),
1108-
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
1109-
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"),
1110-
),
1111-
},
1112-
{
1113-
ResourceName: resourceName,
1114-
ImportState: true,
1115-
ImportStateVerify: true,
1116-
ImportStateVerifyIgnore: []string{
1117-
"current_version",
1118-
},
1119-
},
1120-
{
1121-
Config: testAccClusterConfig_tags2(rName, "key1", "value1updated", "key2", "value2"),
1122-
Check: resource.ComposeAggregateTestCheckFunc(
1123-
testAccCheckClusterExists(resourceName, &cluster),
1124-
resource.TestCheckResourceAttr(resourceName, "tags.%", "2"),
1125-
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"),
1126-
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
1127-
),
1128-
},
1129-
{
1130-
Config: testAccClusterConfig_tags1(rName, "key2", "value2"),
1131-
Check: resource.ComposeAggregateTestCheckFunc(
1132-
testAccCheckClusterExists(resourceName, &cluster),
1133-
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
1134-
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
1135-
),
1136-
},
1137-
},
1138-
})
1139-
}
1140-
11411164
func testAccCheckResourceAttrIsSortedCSV(resourceName, attributeName string) resource.TestCheckFunc {
11421165
return func(s *terraform.State) error {
11431166
is, err := acctest.PrimaryInstanceState(s, resourceName)

internal/service/kafka/find.go

+39
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,31 @@ func FindClusterByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafk
3535
return output.ClusterInfo, nil
3636
}
3737

38+
func findClusterV2ByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.Cluster, error) {
39+
input := &kafka.DescribeClusterV2Input{
40+
ClusterArn: aws.String(arn),
41+
}
42+
43+
output, err := conn.DescribeClusterV2WithContext(ctx, input)
44+
45+
if tfawserr.ErrCodeEquals(err, kafka.ErrCodeNotFoundException) {
46+
return nil, &resource.NotFoundError{
47+
LastError: err,
48+
LastRequest: input,
49+
}
50+
}
51+
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
if output == nil || output.ClusterInfo == nil {
57+
return nil, tfresource.NewEmptyResultError(input)
58+
}
59+
60+
return output.ClusterInfo, nil
61+
}
62+
3863
func FindClusterOperationByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.ClusterOperationInfo, error) {
3964
input := &kafka.DescribeClusterOperationInput{
4065
ClusterOperationArn: aws.String(arn),
@@ -102,3 +127,17 @@ func FindScramSecrets(conn *kafka.Kafka, clusterArn string) ([]*string, error) {
102127

103128
return scramSecrets, err
104129
}
130+
131+
func FindServerlessClusterByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.Cluster, error) {
132+
output, err := findClusterV2ByARN(ctx, conn, arn)
133+
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
if output.Serverless == nil {
139+
return nil, tfresource.NewEmptyResultError(arn)
140+
}
141+
142+
return output, nil
143+
}

internal/service/kafka/generate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:generate go run ../../generate/tags/main.go -ListTags -ServiceTagsMap -UpdateTags
1+
//go:generate go run ../../generate/tags/main.go -ServiceTagsMap -UpdateTags
22
// ONLY generate directives and package declaration! Do not add anything else to this file.
33

44
package kafka

0 commit comments

Comments
 (0)