Skip to content

Commit

Permalink
Merge pull request #21937 from roberth-k/f-aws_lambda_event_source_ma…
Browse files Browse the repository at this point in the history
…pping-filter_criteria

r/aws_lambda_event_source_mapping: add filter_criteria attribute
  • Loading branch information
ewbankkit authored Dec 2, 2021
2 parents f79538d + 2b64999 commit 8dfef41
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/21937.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `filter_criteria` argument
```
145 changes: 145 additions & 0 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,30 @@ func ResourceEventSourceMapping() *schema.Resource {
ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"},
},

"filter_criteria": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"filter": {
Type: schema.TypeSet,
Optional: true,
MaxItems: 5,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"pattern": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringLenBetween(0, 4096),
},
},
},
},
},
},
},

"function_arn": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -315,6 +339,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})
target = v
}

if v, ok := d.GetOk("filter_criteria"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.FilterCriteria = expandLambdaFilterCriteria(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("function_response_types"); ok && v.(*schema.Set).Len() > 0 {
input.FunctionResponseTypes = flex.ExpandStringSet(v.(*schema.Set))
}
Expand Down Expand Up @@ -442,6 +470,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
d.Set("destination_config", nil)
}
d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn)
if v := eventSourceMappingConfiguration.FilterCriteria; v != nil {
if err := d.Set("filter_criteria", []interface{}{flattenLambdaFilterCriteria(v)}); err != nil {
return fmt.Errorf("error setting filter criteria: %w", err)
}
} else {
d.Set("filter_criteria", nil)
}
d.Set("function_arn", eventSourceMappingConfiguration.FunctionArn)
d.Set("function_name", eventSourceMappingConfiguration.FunctionArn)
d.Set("function_response_types", aws.StringValueSlice(eventSourceMappingConfiguration.FunctionResponseTypes))
Expand Down Expand Up @@ -518,6 +553,15 @@ func resourceEventSourceMappingUpdate(d *schema.ResourceData, meta interface{})
input.Enabled = aws.Bool(d.Get("enabled").(bool))
}

if d.HasChange("filter_criteria") {
if v, ok := d.GetOk("filter_criteria"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.FilterCriteria = expandLambdaFilterCriteria(v.([]interface{})[0].(map[string]interface{}))
} else {
// AWS ignores the removal if this is left as nil.
input.FilterCriteria = &lambda.FilterCriteria{}
}
}

if d.HasChange("function_name") {
input.FunctionName = aws.String(d.Get("function_name").(string))
}
Expand Down Expand Up @@ -798,3 +842,104 @@ func flattenLambdaSourceAccessConfigurations(apiObjects []*lambda.SourceAccessCo

return tfList
}

func expandLambdaFilterCriteria(tfMap map[string]interface{}) *lambda.FilterCriteria {
if tfMap == nil {
return nil
}

apiObject := &lambda.FilterCriteria{}

if v, ok := tfMap["filter"].(*schema.Set); ok && v.Len() > 0 {
apiObject.Filters = expandLambdaFilters(v.List())
}

return apiObject
}

func flattenLambdaFilterCriteria(apiObject *lambda.FilterCriteria) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.Filters; len(v) > 0 {
tfMap["filter"] = flattenLambdaFilters(v)
}

return tfMap
}

func expandLambdaFilters(tfList []interface{}) []*lambda.Filter {
if len(tfList) == 0 {
return nil
}

var apiObjects []*lambda.Filter

for _, tfMapRaw := range tfList {
tfMap, ok := tfMapRaw.(map[string]interface{})

if !ok {
continue
}

apiObject := expandLambdaFilter(tfMap)

if apiObject == nil {
continue
}

apiObjects = append(apiObjects, apiObject)
}

return apiObjects
}

func flattenLambdaFilters(apiObjects []*lambda.Filter) []interface{} {
if len(apiObjects) == 0 {
return nil
}

var tfList []interface{}

for _, apiObject := range apiObjects {
if apiObject == nil {
continue
}

tfList = append(tfList, flattenLambdaFilter(apiObject))
}

return tfList
}

func expandLambdaFilter(tfMap map[string]interface{}) *lambda.Filter {
if tfMap == nil {
return nil
}

apiObject := &lambda.Filter{}

if v, ok := tfMap["pattern"].(string); ok {
// The API permits patterns of length >= 0, so accept the empty string.
apiObject.Pattern = aws.String(v)
}

return apiObject
}

func flattenLambdaFilter(apiObject *lambda.Filter) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.Pattern; v != nil {
tfMap["pattern"] = aws.StringValue(v)
}

return tfMap
}
114 changes: 114 additions & 0 deletions internal/service/lambda/event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestAccLambdaEventSourceMapping_SQS_basic(t *testing.T) {
resource.TestCheckResourceAttrPair(resourceName, "function_name", functionResourceName, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "0"),
),
},
// batch_size became optional. Ensure that if the user supplies the default
Expand Down Expand Up @@ -876,6 +877,76 @@ func TestAccLambdaEventSourceMapping_rabbitMQ(t *testing.T) {
})
}

func TestAccLambdaEventSourceMapping_SQS_filterCriteria(t *testing.T) {
var conf lambda.EventSourceMappingConfiguration
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_lambda_event_source_mapping.test"
pattern1 := "{\"Region\": [{\"prefix\": \"us-\"}]}"
pattern2 := "{\"Location\": [\"New York\"], \"Day\": [\"Monday\"]}"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, lambda.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccEventSourceMappingSQSFilterCriteria_1(rName, pattern1),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "1"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.0.filter.#", "1"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern1}),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingSQSFilterCriteria_2(rName, pattern1, pattern2),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "1"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.0.filter.#", "2"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern1}),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern2}),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingSQSFilterCriteria_3(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "0"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingSQSFilterCriteria_1(rName, pattern1),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "1"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.0.filter.#", "1"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern1}),
),
},
},
})
}

func testAccCheckEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).LambdaConn
Expand Down Expand Up @@ -1832,3 +1903,46 @@ func testAccPreCheckSecretsManager(t *testing.T) {
t.Fatalf("unexpected PreCheck error: %s", err)
}
}

func testAccEventSourceMappingSQSFilterCriteria_1(rName string, pattern1 string) string {
return acctest.ConfigCompose(testAccEventSourceMappingSQSBaseConfig(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
filter_criteria {
filter {
pattern = %q
}
}
}
`, pattern1))
}

func testAccEventSourceMappingSQSFilterCriteria_2(rName string, pattern1, pattern2 string) string {
return acctest.ConfigCompose(testAccEventSourceMappingSQSBaseConfig(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
filter_criteria {
filter {
pattern = %q
}
filter {
pattern = %q
}
}
}
`, pattern1, pattern2))
}

func testAccEventSourceMappingSQSFilterCriteria_3(rName string) string {
return acctest.ConfigCompose(testAccEventSourceMappingSQSBaseConfig(rName), `
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
}
`)
}
29 changes: 29 additions & 0 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ resource "aws_lambda_event_source_mapping" "example" {
}
```

### SQS with event filter

```terraform
resource "aws_lambda_event_source_mapping" "example" {
event_source_arn = aws_sqs_queue.sqs_queue_test.arn
function_name = aws_lambda_function.example.arn
filter_criteria {
filter {
pattern = jsonencode({
body = {
Temperature : [{ numeric : [">", 0, "<=", 100] }]
Location : ["New York"]
}
})
}
}
}
```

### Amazon MQ (ActiveMQ)

```terraform
Expand Down Expand Up @@ -132,6 +152,7 @@ resource "aws_lambda_event_source_mapping" "example" {
* `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below.
* `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`.
* `event_source_arn` - (Optional) The event source ARN - this is required for Kinesis stream, DynamoDB stream, SQS queue, MQ broker or MSK cluster. It is incompatible with a Self Managed Kafka source.
* `filter_criteria` - (Optional) The criteria to use for [event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) Kinesis stream, DynamoDB stream, SQS queue event sources. Detailed below.
* `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events.
* `function_response_types` - (Optional) A list of current response type enums applied to the event source mapping for [AWS Lambda checkpointing](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). Only available for stream sources (DynamoDB and Kinesis). Valid values: `ReportBatchItemFailures`.
* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function. Only available for stream sources (DynamoDB and Kinesis) and SQS standard queues.
Expand All @@ -154,6 +175,14 @@ resource "aws_lambda_event_source_mapping" "example" {

* `destination_arn` - (Required) The Amazon Resource Name (ARN) of the destination resource.

### filter_criteria Configuration Block

* `filter` - (Optional) A set of up to 5 filter. If an event satisfies at least one, Lambda sends the event to the function or adds it to the next batch. Detailed below.

#### filter_criteria filter Configuration Block

* `pattern` - (Optional) A filter pattern up to 4096 characters. See [Filter Rule Syntax](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax).

### self_managed_event_source Configuration Block

* `endpoints` - (Required) A map of endpoints for the self managed source. For Kafka self-managed sources, the key should be `KAFKA_BOOTSTRAP_SERVERS` and the value should be a string with a comma separated list of broker endpoints.
Expand Down

0 comments on commit 8dfef41

Please sign in to comment.