@@ -122,12 +122,12 @@ func (so *observer) waitRollbackMitigation(seqNo uint64) {
122
122
}
123
123
}
124
124
125
- func (so * observer ) canForward (seqNo uint64 ) bool {
125
+ func (so * observer ) canForward (seqNo uint64 , isControl bool ) bool {
126
126
if ! so .config .RollbackMitigation .Disabled {
127
127
so .waitRollbackMitigation (seqNo )
128
128
}
129
129
130
- return ! so .needCatchup (seqNo )
130
+ return isControl || ! so .needCatchup (seqNo )
131
131
}
132
132
133
133
func (so * observer ) isBeforeSkipWindow (eventTime time.Time ) bool {
@@ -166,7 +166,7 @@ func (so *observer) sendOrSkip(args models.ListenerArgs) {
166
166
}
167
167
168
168
func (so * observer ) SnapshotMarker (event models.DcpSnapshotMarker ) {
169
- if ! so .canForward (event .StartSeqNo ) {
169
+ if ! so .canForward (event .StartSeqNo , true ) {
170
170
return
171
171
}
172
172
@@ -192,7 +192,7 @@ func (so *observer) IsInSnapshotMarker(seqNo uint64) bool {
192
192
}
193
193
194
194
func (so * observer ) Mutation (event gocbcore.DcpMutation ) { //nolint:dupl
195
- if ! so .canForward (event .SeqNo ) {
195
+ if ! so .canForward (event .SeqNo , false ) {
196
196
return
197
197
}
198
198
@@ -220,7 +220,7 @@ func (so *observer) Mutation(event gocbcore.DcpMutation) { //nolint:dupl
220
220
}
221
221
222
222
func (so * observer ) Deletion (event gocbcore.DcpDeletion ) { //nolint:dupl
223
- if ! so .canForward (event .SeqNo ) {
223
+ if ! so .canForward (event .SeqNo , false ) {
224
224
return
225
225
}
226
226
@@ -248,7 +248,7 @@ func (so *observer) Deletion(event gocbcore.DcpDeletion) { //nolint:dupl
248
248
}
249
249
250
250
func (so * observer ) Expiration (event gocbcore.DcpExpiration ) { //nolint:dupl
251
- if ! so .canForward (event .SeqNo ) {
251
+ if ! so .canForward (event .SeqNo , false ) {
252
252
return
253
253
}
254
254
@@ -288,7 +288,7 @@ func (so *observer) End(event models.DcpStreamEnd, err error) {
288
288
}
289
289
290
290
func (so * observer ) CreateCollection (event gocbcore.DcpCollectionCreation ) {
291
- if ! so .canForward (event .SeqNo ) {
291
+ if ! so .canForward (event .SeqNo , false ) {
292
292
return
293
293
}
294
294
@@ -308,7 +308,7 @@ func (so *observer) CreateCollection(event gocbcore.DcpCollectionCreation) {
308
308
}
309
309
310
310
func (so * observer ) DeleteCollection (event gocbcore.DcpCollectionDeletion ) {
311
- if ! so .canForward (event .SeqNo ) {
311
+ if ! so .canForward (event .SeqNo , false ) {
312
312
return
313
313
}
314
314
@@ -328,7 +328,7 @@ func (so *observer) DeleteCollection(event gocbcore.DcpCollectionDeletion) {
328
328
}
329
329
330
330
func (so * observer ) FlushCollection (event gocbcore.DcpCollectionFlush ) {
331
- if ! so .canForward (event .SeqNo ) {
331
+ if ! so .canForward (event .SeqNo , false ) {
332
332
return
333
333
}
334
334
@@ -348,7 +348,7 @@ func (so *observer) FlushCollection(event gocbcore.DcpCollectionFlush) {
348
348
}
349
349
350
350
func (so * observer ) CreateScope (event gocbcore.DcpScopeCreation ) {
351
- if ! so .canForward (event .SeqNo ) {
351
+ if ! so .canForward (event .SeqNo , false ) {
352
352
return
353
353
}
354
354
@@ -367,7 +367,7 @@ func (so *observer) CreateScope(event gocbcore.DcpScopeCreation) {
367
367
}
368
368
369
369
func (so * observer ) DeleteScope (event gocbcore.DcpScopeDeletion ) {
370
- if ! so .canForward (event .SeqNo ) {
370
+ if ! so .canForward (event .SeqNo , false ) {
371
371
return
372
372
}
373
373
@@ -386,7 +386,7 @@ func (so *observer) DeleteScope(event gocbcore.DcpScopeDeletion) {
386
386
}
387
387
388
388
func (so * observer ) ModifyCollection (event gocbcore.DcpCollectionModification ) {
389
- if ! so .canForward (event .SeqNo ) {
389
+ if ! so .canForward (event .SeqNo , false ) {
390
390
return
391
391
}
392
392
@@ -412,7 +412,7 @@ func (so *observer) OSOSnapshot(event gocbcore.DcpOSOSnapshot) {
412
412
}
413
413
414
414
func (so * observer ) SeqNoAdvanced (advanced gocbcore.DcpSeqNoAdvanced ) {
415
- if ! so .canForward (advanced .SeqNo ) {
415
+ if ! so .canForward (advanced .SeqNo , true ) {
416
416
return
417
417
}
418
418
0 commit comments