Skip to content

Commit 60ec508

Browse files
authored
Merge pull request #26560 from A122943/patch-1
aws_lambda_event_source_mapping: Add Kafka config blocks + consumer_group_id
2 parents 5bf2dfa + 55837a4 commit 60ec508

File tree

4 files changed

+290
-45
lines changed

4 files changed

+290
-45
lines changed

.changelog/26560.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
resource/aws_lambda_event_source_mapping: Add `amazon_managed_kafka_event_source_config` and `self_managed_kafka_event_source_config` configuration blocks
3+
```

internal/service/lambda/event_source_mapping.go

+116-24
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,25 @@ func ResourceEventSourceMapping() *schema.Resource {
3333
},
3434

3535
Schema: map[string]*schema.Schema{
36+
"amazon_managed_kafka_event_source_config": {
37+
Type: schema.TypeList,
38+
Optional: true,
39+
Computed: true,
40+
ForceNew: true,
41+
MaxItems: 1,
42+
ConflictsWith: []string{"self_managed_event_source", "self_managed_kafka_event_source_config"},
43+
Elem: &schema.Resource{
44+
Schema: map[string]*schema.Schema{
45+
"consumer_group_id": {
46+
Type: schema.TypeString,
47+
Optional: true,
48+
Computed: true,
49+
ForceNew: true,
50+
ValidateFunc: validation.StringLenBetween(1, 200),
51+
},
52+
},
53+
},
54+
},
3655
"batch_size": {
3756
Type: schema.TypeInt,
3857
Optional: true,
@@ -68,12 +87,10 @@ func ResourceEventSourceMapping() *schema.Resource {
6887
return old == new
6988
},
7089
},
71-
7290
"bisect_batch_on_function_error": {
7391
Type: schema.TypeBool,
7492
Optional: true,
7593
},
76-
7794
"destination_config": {
7895
Type: schema.TypeList,
7996
Optional: true,
@@ -98,20 +115,17 @@ func ResourceEventSourceMapping() *schema.Resource {
98115
},
99116
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
100117
},
101-
102118
"enabled": {
103119
Type: schema.TypeBool,
104120
Optional: true,
105121
Default: true,
106122
},
107-
108123
"event_source_arn": {
109124
Type: schema.TypeString,
110125
Optional: true,
111126
ForceNew: true,
112127
ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"},
113128
},
114-
115129
"filter_criteria": {
116130
Type: schema.TypeList,
117131
Optional: true,
@@ -135,12 +149,10 @@ func ResourceEventSourceMapping() *schema.Resource {
135149
},
136150
},
137151
},
138-
139152
"function_arn": {
140153
Type: schema.TypeString,
141154
Computed: true,
142155
},
143-
144156
"function_name": {
145157
Type: schema.TypeString,
146158
Required: true,
@@ -152,7 +164,6 @@ func ResourceEventSourceMapping() *schema.Resource {
152164
return (oldFunctionName == new && oldFunctionNameErr == nil) || (newFunctionName == old && newFunctionNameErr == nil)
153165
},
154166
},
155-
156167
"function_response_types": {
157168
Type: schema.TypeSet,
158169
Optional: true,
@@ -161,22 +172,18 @@ func ResourceEventSourceMapping() *schema.Resource {
161172
ValidateFunc: validation.StringInSlice(lambda.FunctionResponseType_Values(), false),
162173
},
163174
},
164-
165175
"last_modified": {
166176
Type: schema.TypeString,
167177
Computed: true,
168178
},
169-
170179
"last_processing_result": {
171180
Type: schema.TypeString,
172181
Computed: true,
173182
},
174-
175183
"maximum_batching_window_in_seconds": {
176184
Type: schema.TypeInt,
177185
Optional: true,
178186
},
179-
180187
"maximum_record_age_in_seconds": {
181188
Type: schema.TypeInt,
182189
Optional: true,
@@ -186,21 +193,18 @@ func ResourceEventSourceMapping() *schema.Resource {
186193
validation.IntBetween(60, 604_800),
187194
),
188195
},
189-
190196
"maximum_retry_attempts": {
191197
Type: schema.TypeInt,
192198
Optional: true,
193199
Computed: true,
194200
ValidateFunc: validation.IntBetween(-1, 10_000),
195201
},
196-
197202
"parallelization_factor": {
198203
Type: schema.TypeInt,
199204
Optional: true,
200205
ValidateFunc: validation.IntBetween(1, 10),
201206
Computed: true,
202207
},
203-
204208
"queues": {
205209
Type: schema.TypeSet,
206210
Optional: true,
@@ -210,7 +214,6 @@ func ResourceEventSourceMapping() *schema.Resource {
210214
ValidateFunc: validation.StringLenBetween(1, 1000),
211215
},
212216
},
213-
214217
"self_managed_event_source": {
215218
Type: schema.TypeList,
216219
Optional: true,
@@ -241,7 +244,25 @@ func ResourceEventSourceMapping() *schema.Resource {
241244
},
242245
ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"},
243246
},
244-
247+
"self_managed_kafka_event_source_config": {
248+
Type: schema.TypeList,
249+
Optional: true,
250+
Computed: true,
251+
ForceNew: true,
252+
MaxItems: 1,
253+
ConflictsWith: []string{"event_source_arn", "amazon_managed_kafka_event_source_config"},
254+
Elem: &schema.Resource{
255+
Schema: map[string]*schema.Schema{
256+
"consumer_group_id": {
257+
Type: schema.TypeString,
258+
Optional: true,
259+
Computed: true,
260+
ForceNew: true,
261+
ValidateFunc: validation.StringLenBetween(1, 200),
262+
},
263+
},
264+
},
265+
},
245266
"source_access_configuration": {
246267
Type: schema.TypeSet,
247268
Optional: true,
@@ -260,31 +281,26 @@ func ResourceEventSourceMapping() *schema.Resource {
260281
},
261282
},
262283
},
263-
264284
"starting_position": {
265285
Type: schema.TypeString,
266286
Optional: true,
267287
ForceNew: true,
268288
ValidateFunc: validation.StringInSlice(lambda.EventSourcePosition_Values(), false),
269289
},
270-
271290
"starting_position_timestamp": {
272291
Type: schema.TypeString,
273292
Optional: true,
274293
ForceNew: true,
275294
ValidateFunc: validation.IsRFC3339Time,
276295
},
277-
278296
"state": {
279297
Type: schema.TypeString,
280298
Computed: true,
281299
},
282-
283300
"state_transition_reason": {
284301
Type: schema.TypeString,
285302
Computed: true,
286303
},
287-
288304
"topics": {
289305
Type: schema.TypeSet,
290306
Optional: true,
@@ -294,13 +310,11 @@ func ResourceEventSourceMapping() *schema.Resource {
294310
ValidateFunc: validation.StringLenBetween(1, 249),
295311
},
296312
},
297-
298313
"tumbling_window_in_seconds": {
299314
Type: schema.TypeInt,
300315
Optional: true,
301316
ValidateFunc: validation.IntBetween(0, 900),
302317
},
303-
304318
"uuid": {
305319
Type: schema.TypeString,
306320
Computed: true,
@@ -320,6 +334,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})
320334

321335
var target string
322336

337+
if v, ok := d.GetOk("amazon_managed_kafka_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
338+
input.AmazonManagedKafkaEventSourceConfig = expandAmazonManagedKafkaEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
339+
}
340+
323341
if v, ok := d.GetOk("batch_size"); ok {
324342
input.BatchSize = aws.Int64(int64(v.(int)))
325343
}
@@ -373,6 +391,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})
373391
target = "Self-Managed Apache Kafka"
374392
}
375393

394+
if v, ok := d.GetOk("self_managed_kafka_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
395+
input.SelfManagedKafkaEventSourceConfig = expandSelfManagedKafkaEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
396+
}
397+
376398
if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 {
377399
input.SourceAccessConfigurations = expandSourceAccessConfigurations(v.(*schema.Set).List())
378400
}
@@ -460,6 +482,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
460482
return fmt.Errorf("error reading Lambda Event Source Mapping (%s): %w", d.Id(), err)
461483
}
462484

485+
if eventSourceMappingConfiguration.AmazonManagedKafkaEventSourceConfig != nil {
486+
if err := d.Set("amazon_managed_kafka_event_source_config", []interface{}{flattenAmazonManagedKafkaEventSourceConfig(eventSourceMappingConfiguration.AmazonManagedKafkaEventSourceConfig)}); err != nil {
487+
return fmt.Errorf("error setting amazon_managed_kafka_event_source_config: %w", err)
488+
}
489+
} else {
490+
d.Set("amazon_managed_kafka_event_source_config", nil)
491+
}
463492
d.Set("batch_size", eventSourceMappingConfiguration.BatchSize)
464493
d.Set("bisect_batch_on_function_error", eventSourceMappingConfiguration.BisectBatchOnFunctionError)
465494
if eventSourceMappingConfiguration.DestinationConfig != nil {
@@ -498,6 +527,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
498527
} else {
499528
d.Set("self_managed_event_source", nil)
500529
}
530+
if eventSourceMappingConfiguration.SelfManagedKafkaEventSourceConfig != nil {
531+
if err := d.Set("self_managed_kafka_event_source_config", []interface{}{flattenSelfManagedKafkaEventSourceConfig(eventSourceMappingConfiguration.SelfManagedKafkaEventSourceConfig)}); err != nil {
532+
return fmt.Errorf("error setting self_managed_kafka_event_source_config: %w", err)
533+
}
534+
} else {
535+
d.Set("self_managed_kafka_event_source_config", nil)
536+
}
501537
if err := d.Set("source_access_configuration", flattenSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations)); err != nil {
502538
return fmt.Errorf("error setting source_access_configuration: %w", err)
503539
}
@@ -763,6 +799,62 @@ func flattenSelfManagedEventSource(apiObject *lambda.SelfManagedEventSource) map
763799
return tfMap
764800
}
765801

802+
func expandAmazonManagedKafkaEventSourceConfig(tfMap map[string]interface{}) *lambda.AmazonManagedKafkaEventSourceConfig {
803+
if tfMap == nil {
804+
return nil
805+
}
806+
807+
apiObject := &lambda.AmazonManagedKafkaEventSourceConfig{}
808+
809+
if v, ok := tfMap["consumer_group_id"].(string); ok && v != "" {
810+
apiObject.ConsumerGroupId = aws.String(v)
811+
}
812+
813+
return apiObject
814+
}
815+
816+
func flattenAmazonManagedKafkaEventSourceConfig(apiObject *lambda.AmazonManagedKafkaEventSourceConfig) map[string]interface{} {
817+
if apiObject == nil {
818+
return nil
819+
}
820+
821+
tfMap := map[string]interface{}{}
822+
823+
if v := apiObject.ConsumerGroupId; v != nil {
824+
tfMap["consumer_group_id"] = aws.StringValue(v)
825+
}
826+
827+
return tfMap
828+
}
829+
830+
func expandSelfManagedKafkaEventSourceConfig(tfMap map[string]interface{}) *lambda.SelfManagedKafkaEventSourceConfig {
831+
if tfMap == nil {
832+
return nil
833+
}
834+
835+
apiObject := &lambda.SelfManagedKafkaEventSourceConfig{}
836+
837+
if v, ok := tfMap["consumer_group_id"].(string); ok && v != "" {
838+
apiObject.ConsumerGroupId = aws.String(v)
839+
}
840+
841+
return apiObject
842+
}
843+
844+
func flattenSelfManagedKafkaEventSourceConfig(apiObject *lambda.SelfManagedKafkaEventSourceConfig) map[string]interface{} {
845+
if apiObject == nil {
846+
return nil
847+
}
848+
849+
tfMap := map[string]interface{}{}
850+
851+
if v := apiObject.ConsumerGroupId; v != nil {
852+
tfMap["consumer_group_id"] = aws.StringValue(v)
853+
}
854+
855+
return tfMap
856+
}
857+
766858
func expandSourceAccessConfiguration(tfMap map[string]interface{}) *lambda.SourceAccessConfiguration {
767859
if tfMap == nil {
768860
return nil

0 commit comments

Comments
 (0)