Skip to content

Commit

Permalink
Fix bug for etcd compaction causes the metaTable pointer to be differ…
Browse files Browse the repository at this point in the history
…ent (#18418)

Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored Jul 26, 2022
1 parent 63fd633 commit 1b33c73
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 3 deletions.
5 changes: 4 additions & 1 deletion internal/indexcoord/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func newIndexBuilder(ctx context.Context, ic *IndexCoord, metaTable *metaTable,
cancel: cancel,
meta: metaTable,
ic: ic,
tasks: make(map[int64]indexTaskState, 1024),
notify: make(chan struct{}, 1024),
scheduleDuration: time.Second * 10,
}
Expand All @@ -74,6 +73,10 @@ func (ib *indexBuilder) Stop() {
}

func (ib *indexBuilder) reloadFromKV(aliveNodes []UniqueID) {
ib.taskMutex.Lock()
defer ib.taskMutex.Unlock()
ib.tasks = make(map[int64]indexTaskState, 1024)

metas := ib.meta.GetAllIndexMeta()
for build, indexMeta := range metas {
// deleted, need to release lock and clean meta
Expand Down
4 changes: 2 additions & 2 deletions internal/indexcoord/index_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,13 +828,13 @@ func (i *IndexCoord) watchMetaLoop() {
}
if err := resp.Err(); err != nil {
if err == v3rpc.ErrCompacted {
newMetaTable, internalErr := NewMetaTable(i.metaTable.client)
internalErr := i.metaTable.reloadFromKV()
if internalErr != nil {
log.Error("Constructing new meta table fails when etcd has a compaction error",
zap.String("path", indexFilePrefix), zap.String("etcd error", err.Error()), zap.Error(internalErr))
panic("failed to handle etcd request, exit..")
}
i.metaTable = newMetaTable
i.indexBuilder.reloadFromKV(i.nodeManager.ListAllNodes())
i.loopWg.Add(1)
go i.watchMetaLoop()
return
Expand Down
2 changes: 2 additions & 0 deletions internal/indexcoord/meta_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func NewMetaTable(kv kv.MetaKv) (*metaTable, error) {

// reloadFromKV reloads the index meta from ETCD.
func (mt *metaTable) reloadFromKV() error {
mt.lock.Lock()
defer mt.lock.Unlock()
mt.indexBuildID2Meta = make(map[UniqueID]*Meta)
key := indexFilePrefix
log.Debug("IndexCoord metaTable LoadWithPrefix ", zap.String("prefix", key))
Expand Down
12 changes: 12 additions & 0 deletions internal/indexcoord/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ func (nm *NodeManager) PeekClient(meta *Meta) (UniqueID, types.IndexNode) {
return 0, nil
}

func (nm *NodeManager) ListAllNodes() []UniqueID {
nm.lock.RLock()
defer nm.lock.RUnlock()

nodeIDs := make([]UniqueID, 0)
for nodeID := range nm.nodeClients {
nodeIDs = append(nodeIDs, nodeID)
}

return nodeIDs
}

// indexNodeGetMetricsResponse record the metrics information of IndexNode.
type indexNodeGetMetricsResponse struct {
resp *milvuspb.GetMetricsResponse
Expand Down
3 changes: 3 additions & 0 deletions internal/indexcoord/node_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,7 @@ func TestNodeManager_PeekClient(t *testing.T) {
nodeID2, client2 := nm.PeekClient(meta)
assert.Equal(t, int64(0), nodeID2)
assert.Nil(t, client2)

nodeIDs := nm.ListAllNodes()
assert.Equal(t, 1, len(nodeIDs))
}

0 comments on commit 1b33c73

Please sign in to comment.