Skip to content

Commit e83f50e

Browse files
fengshaobao 00231050jpbetz
fengshaobao 00231050
authored andcommitted
mvcc: sending events after restore
Fixes: #8411
1 parent 232a81d commit e83f50e

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

mvcc/watchable_store.go

+15
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,21 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
258258
s.mu.Unlock()
259259
}
260260

261+
func (s *watchableStore) Restore(b backend.Backend) error {
262+
s.mu.Lock()
263+
defer s.mu.Unlock()
264+
err := s.store.Restore(b)
265+
if err != nil {
266+
return err
267+
}
268+
269+
for wa := range s.synced.watchers {
270+
s.unsynced.watchers.add(wa)
271+
}
272+
s.synced = newWatcherGroup()
273+
return nil
274+
}
275+
261276
// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
262277
func (s *watchableStore) syncWatchersLoop() {
263278
defer s.wg.Done()

mvcc/watchable_store_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,39 @@ func TestWatchFutureRev(t *testing.T) {
294294
}
295295
}
296296

297+
func TestWatchRestore(t *testing.T) {
298+
b, tmpPath := backend.NewDefaultTmpBackend()
299+
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
300+
defer cleanup(s, b, tmpPath)
301+
302+
testKey := []byte("foo")
303+
testValue := []byte("bar")
304+
rev := s.Put(testKey, testValue, lease.NoLease)
305+
306+
newBackend, newPath := backend.NewDefaultTmpBackend()
307+
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
308+
defer cleanup(newStore, newBackend, newPath)
309+
310+
w := newStore.NewWatchStream()
311+
w.Watch(testKey, nil, rev-1)
312+
313+
newStore.Restore(b)
314+
select {
315+
case resp := <-w.Chan():
316+
if resp.Revision != rev {
317+
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
318+
}
319+
if len(resp.Events) != 1 {
320+
t.Fatalf("failed to get events from the response")
321+
}
322+
if resp.Events[0].Kv.ModRevision != rev {
323+
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
324+
}
325+
case <-time.After(time.Second):
326+
t.Fatal("failed to receive event in 1 second.")
327+
}
328+
}
329+
297330
// TestWatchBatchUnsynced tests batching on unsynced watchers
298331
func TestWatchBatchUnsynced(t *testing.T) {
299332
b, tmpPath := backend.NewDefaultTmpBackend()

0 commit comments

Comments
 (0)