Skip to content

Commit 5f1f524

Browse files
committed
feat: remove eventBus usage on persistSeqNo change
1 parent ce4a85b commit 5f1f524

File tree

5 files changed

+35
-47
lines changed

5 files changed

+35
-47
lines changed

couchbase/rollback_mitigation.go

+27-26
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212

1313
"golang.org/x/sync/errgroup"
1414

15-
"github.com/asaskevich/EventBus"
16-
1715
"github.com/couchbase/gocbcore/v10"
1816

1917
"github.com/Trendyol/go-dcp/wrapper"
@@ -22,7 +20,6 @@ import (
2220

2321
"github.com/Trendyol/go-dcp/config"
2422

25-
"github.com/Trendyol/go-dcp/helpers"
2623
"github.com/Trendyol/go-dcp/logger"
2724
)
2825

@@ -62,20 +59,20 @@ func (v *vbUUIDAndSeqNo) IsOutdated(last *gocbcore.ObserveVbResult) bool {
6259
}
6360

6461
type rollbackMitigation struct {
65-
bus EventBus.Bus
66-
client Client
67-
vbUUIDMap *wrapper.ConcurrentSwissMap[uint16, gocbcore.VbUUID]
68-
configSnapshot *gocbcore.ConfigSnapshot
69-
persistedSeqNos *wrapper.ConcurrentSwissMap[uint16, []*vbUUIDAndSeqNo]
70-
observeCount *atomic.Uint32
71-
config *config.Dcp
72-
observeTimer *time.Ticker
73-
observeCloseCh chan struct{}
74-
observeCloseDoneCh chan struct{}
75-
vbIds []uint16
76-
activeGroupID int
77-
configWatchRunning bool
78-
closed bool
62+
client Client
63+
observeTimer *time.Ticker
64+
configSnapshot *gocbcore.ConfigSnapshot
65+
persistedSeqNos *wrapper.ConcurrentSwissMap[uint16, []*vbUUIDAndSeqNo]
66+
observeCount *atomic.Uint32
67+
config *config.Dcp
68+
vbUUIDMap *wrapper.ConcurrentSwissMap[uint16, gocbcore.VbUUID]
69+
observeCloseCh chan struct{}
70+
observeCloseDoneCh chan struct{}
71+
persistSeqNoDispatcher models.PersistSeqNoDispatcher
72+
vbIds []uint16
73+
activeGroupID int
74+
configWatchRunning bool
75+
closed bool
7976
}
8077

8178
func (r *rollbackMitigation) getRevEpochAndID(snapshot *gocbcore.ConfigSnapshot) (int64, int64) {
@@ -340,7 +337,7 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU
340337
replicas[replica].SetSeqNo(result.PersistSeqNo)
341338
replicas[replica].SetVbUUID(result.VbUUID)
342339

343-
r.bus.Publish(helpers.PersistSeqNoChangedBusEventName, models.PersistSeqNo{
340+
r.persistSeqNoDispatcher(&models.PersistSeqNo{
344341
VbID: vbID,
345342
SeqNo: r.getMinSeqNo(vbID),
346343
})
@@ -440,14 +437,18 @@ func (r *rollbackMitigation) Stop() {
440437
logger.Log.Info("rollback mitigation stopped")
441438
}
442439

443-
func NewRollbackMitigation(client Client, config *config.Dcp, vbIds []uint16, bus EventBus.Bus) RollbackMitigation {
440+
func NewRollbackMitigation(client Client,
441+
config *config.Dcp,
442+
vbIds []uint16,
443+
persistSeqNoDispatcher models.PersistSeqNoDispatcher,
444+
) RollbackMitigation {
444445
return &rollbackMitigation{
445-
client: client,
446-
config: config,
447-
vbIds: vbIds,
448-
bus: bus,
449-
observeCount: &atomic.Uint32{},
450-
observeCloseCh: make(chan struct{}, 1),
451-
observeCloseDoneCh: make(chan struct{}, 1),
446+
client: client,
447+
config: config,
448+
vbIds: vbIds,
449+
observeCount: &atomic.Uint32{},
450+
observeCloseCh: make(chan struct{}, 1),
451+
observeCloseDoneCh: make(chan struct{}, 1),
452+
persistSeqNoDispatcher: persistSeqNoDispatcher,
452453
}
453454
}

dcp.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ func (s *dcp) Start() {
125125

126126
s.stream = stream.NewStream(
127127
s.client, s.metadata, s.config, s.version, s.bucketInfo, s.vBucketDiscovery,
128-
s.consumer, collectionIDs, s.stopCh, s.bus, s.eventHandler,
129-
tc,
128+
s.consumer, collectionIDs, s.stopCh, s.eventHandler, tc,
130129
)
131130

132131
if s.config.LeaderElection.Enabled {

helpers/constants.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ const (
66
Prefix string = "_connector:" + Name + ":"
77
TxnPrefix string = "_txn:"
88

9-
MembershipChangedBusEventName string = "membershipChanged"
10-
PersistSeqNoChangedBusEventName string = "persistSeqNoChanged"
9+
MembershipChangedBusEventName string = "membershipChanged"
1110

1211
JSONFlags uint32 = 50333696
1312
MaxIntValue uint64 = 0xffffffffffffffff

models/listeners.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ type DcpStreamEndContext struct {
2222
}
2323

2424
type (
25-
Listener func(*ListenerContext)
26-
ListenerCh chan ListenerArgs
27-
ListenerEndCh chan DcpStreamEndContext
25+
Listener func(*ListenerContext)
26+
ListenerCh chan ListenerArgs
27+
ListenerEndCh chan DcpStreamEndContext
28+
PersistSeqNoDispatcher func(persistSeqNo *PersistSeqNo)
2829
)
2930

3031
type Consumer interface {

stream/stream.go

+2-14
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313

1414
"github.com/Trendyol/go-dcp/membership"
1515

16-
"github.com/asaskevich/EventBus"
17-
1816
"github.com/couchbase/gocbcore/v10"
1917

2018
"github.com/Trendyol/go-dcp/wrapper"
@@ -56,7 +54,6 @@ type stream struct {
5654
checkpoint Checkpoint
5755
rollbackMitigation couchbase.RollbackMitigation
5856
vBucketDiscovery VBucketDiscovery
59-
bus EventBus.Bus
6057
eventHandler models.EventHandler
6158
config *config.Dcp
6259
metric *Metric
@@ -239,7 +236,7 @@ func (s *stream) Open() {
239236
logger.Log.Info("rollback mitigation is disabled for ephemeral bucket")
240237
s.config.RollbackMitigation.Disabled = true
241238
} else {
242-
s.rollbackMitigation = couchbase.NewRollbackMitigation(s.client, s.config, vbIDs, s.bus)
239+
s.rollbackMitigation = couchbase.NewRollbackMitigation(s.client, s.config, vbIDs, s.dispatchPersistSeqNo)
243240
s.rollbackMitigation.Start()
244241
}
245242
}
@@ -329,7 +326,7 @@ func (s *stream) Save() {
329326
s.checkpoint.Save()
330327
}
331328

332-
func (s *stream) dispatchPersistSeqNo(persistSeqNo models.PersistSeqNo) {
329+
func (s *stream) dispatchPersistSeqNo(persistSeqNo *models.PersistSeqNo) {
333330
if s.observers != nil {
334331
if observer, ok := s.observers.Load(persistSeqNo.VbID); ok {
335332
observer.SetPersistSeqNo(persistSeqNo.SeqNo)
@@ -482,7 +479,6 @@ func NewStream(client couchbase.Client,
482479
consumer models.Consumer,
483480
collectionIDs map[uint32]string,
484481
stopCh chan struct{},
485-
bus EventBus.Bus,
486482
eventHandler models.EventHandler,
487483
tc *tracing.TracerComponent,
488484
) Stream {
@@ -497,7 +493,6 @@ func NewStream(client couchbase.Client,
497493
finishStreamWithCloseCh: make(chan struct{}, 1),
498494
finishStreamWithEndEventCh: make(chan struct{}, 1),
499495
stopCh: stopCh,
500-
bus: bus,
501496
eventHandler: eventHandler,
502497
metric: &Metric{},
503498
tracerComponent: tc,
@@ -510,12 +505,5 @@ func NewStream(client couchbase.Client,
510505
}
511506
}
512507

513-
// there is a single topic for persistSeqNo events related to all vBuckets
514-
// the listener is responsible for dispatching the event to the correct observer
515-
if err := bus.Subscribe(helpers.PersistSeqNoChangedBusEventName, stream.dispatchPersistSeqNo); err != nil {
516-
logger.Log.Error("cannot subscribe to persistSeqNoChanged event, err: %v", err)
517-
panic(err)
518-
}
519-
520508
return stream
521509
}

0 commit comments

Comments
 (0)