Skip to content

Commit

Permalink
init MatcherV2 for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Aug 27, 2024
1 parent 84599f9 commit cfd9602
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions internal/sink/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/internal/metrics"
"github.com/xmidt-org/webhook-schema"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
)
Expand All @@ -36,13 +37,23 @@ type Matcher interface {
getUrls() *ring.Ring
}

// MatcherV1 holds the matching information related to RegistryV1
type MatcherV1 struct {
events []*regexp.Regexp
matcher []*regexp.Regexp
urls *ring.Ring
CommonWebhook
}

// MatcherV2 holds the matching information related to RegistryV2
// TODO: will have to determine if we need a Matcher specifically for Kafka and another for the new webhook
// FOR NOW: leaving as one matcher
type MatcherV2 struct {
matcher []webhook.FieldRegex
urls *ring.Ring
CommonWebhook
}

type CommonWebhook struct {
mutex sync.RWMutex
logger *zap.Logger
Expand All @@ -58,6 +69,13 @@ func NewMatcher(l ancla.Register, logger *zap.Logger) (Matcher, error) {
return nil, err
}
return m, nil
case *ancla.RegistryV2:
m := &MatcherV2{}
m.logger = logger
if err := m.update(*v); err != nil {
return nil, err
}
return m, nil
default:
return nil, fmt.Errorf("invalid listener")
}
Expand Down Expand Up @@ -159,7 +177,7 @@ func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool {

var (
matchEvent = false
matchDevice = false
matchDevice = true
)
for _, eventRegex := range events {
if eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) {
Expand All @@ -172,10 +190,13 @@ func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool {
return false
}

for _, deviceRegex := range matcher {
if deviceRegex.MatchString(msg.Source) || deviceRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) {
matchDevice = true
break
if matcher != nil{
matchDevice = false
for _, deviceRegex := range matcher {
if deviceRegex.MatchString(msg.Source) || deviceRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) {
matchDevice = true
break
}
}
}

Expand All @@ -194,3 +215,19 @@ func (m1 *MatcherV1) getUrls() (urls *ring.Ring) {
m1.urls = m1.urls.Next()
return
}

// TODO: need to add in logic for update
func (m2 *MatcherV2) update(l ancla.RegistryV2) error {
return nil
}

// TODO: need to add in logic for IsMatch
func (m2 *MatcherV2) IsMatch(msg *wrp.Message) bool {
return true
}

// TODO: getUrls will probably be removed from the Matcher Interface
//TODO: want to move the getUrls logic and the urls field to the sink instead of the matcher
func (m2 *MatcherV2) getUrls() (urls *ring.Ring) {
return
}

0 comments on commit cfd9602

Please sign in to comment.