diff --git a/core/api/replicate_msg.go b/core/api/replicate_msg.go index 5173ed32..f698d3d9 100644 --- a/core/api/replicate_msg.go +++ b/core/api/replicate_msg.go @@ -23,16 +23,18 @@ import "github.com/milvus-io/milvus/pkg/mq/msgstream" var EmptyMsgPack = &ReplicateMsg{} type ReplicateMsg struct { - // source collection info + // source collection and channel info CollectionName string CollectionID int64 + PChannelName string MsgPack *msgstream.MsgPack } -func GetReplicateMsg(collectionName string, collectionID int64, msgPack *msgstream.MsgPack) *ReplicateMsg { +func GetReplicateMsg(pchannelName string, collectionName string, collectionID int64, msgPack *msgstream.MsgPack) *ReplicateMsg { return &ReplicateMsg{ CollectionName: collectionName, CollectionID: collectionID, + PChannelName: pchannelName, MsgPack: msgPack, } } diff --git a/core/model/reader.go b/core/model/reader.go index 0b0121cb..69f9792e 100644 --- a/core/model/reader.go +++ b/core/model/reader.go @@ -33,6 +33,12 @@ type SourceCollectionInfo struct { ShardNum int } +// source collection info in the handler +type HandlerCollectionInfo struct { + CollectionID int64 + PChannel string +} + type TargetCollectionInfo struct { DatabaseName string CollectionID int64 diff --git a/core/reader/collection_reader.go b/core/reader/collection_reader.go index 425be3f0..9d6234c0 100644 --- a/core/reader/collection_reader.go +++ b/core/reader/collection_reader.go @@ -87,16 +87,6 @@ func NewCollectionReader(id string, errChan: make(chan error), retryOptions: util.GetRetryOptions(readerConfig.Retry), } - // for _, collectionPositions := range seekPosition { - // for channel, msgPosition := range collectionPositions { - // pchannel := channel - // if IsVirtualChannel(pchannel) { - // pchannel = funcutil.ToPhysicalChannel(pchannel) - // } - // // TODO how to use the target channel - // GetTSManager().CollectTS(pchannel, msgPosition.GetTimestamp()) - // } - // } return reader, nil } diff --git a/core/reader/replicate_channel_manager.go b/core/reader/replicate_channel_manager.go index b504646e..0d4838a4 100644 --- a/core/reader/replicate_channel_manager.go +++ b/core/reader/replicate_channel_manager.go @@ -839,8 +839,8 @@ type replicateChannelHandler struct { // key: source milvus collectionID value: *model.TargetCollectionInfo recordLock deadlock.RWMutex - collectionRecords map[int64]*model.TargetCollectionInfo // key is suorce collection id - collectionNames map[string]int64 + collectionRecords map[int64]*model.TargetCollectionInfo // key is suorce collection id + collectionNames map[string]*model.HandlerCollectionInfo // key is collection name, value is the source brief collection info closeStreamFuncs map[int64]io.Closer forwardPackChan chan *api.ReplicateMsg @@ -878,7 +878,10 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti } r.recordLock.Lock() r.collectionRecords[collectionID] = targetInfo - r.collectionNames[targetInfo.CollectionName] = collectionID + r.collectionNames[targetInfo.CollectionName] = &model.HandlerCollectionInfo{ + CollectionID: collectionID, + PChannel: sourceInfo.PChannel, + } r.closeStreamFuncs[collectionID] = closeStreamFunc go func() { log.Info("start to handle the msg pack", zap.String("channel_name", sourceInfo.VChannel)) @@ -893,7 +896,7 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti return } - r.innerHandleReplicateMsg(false, api.GetReplicateMsg(targetInfo.CollectionName, collectionID, msgPack)) + r.innerHandleReplicateMsg(false, api.GetReplicateMsg(sourceInfo.PChannel, targetInfo.CollectionName, collectionID, msgPack)) } } }() @@ -941,7 +944,7 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti }, }, } - r.generatePackChan <- api.GetReplicateMsg(targetInfo.CollectionName, collectionID, generateMsgPack) + r.generatePackChan <- api.GetReplicateMsg(sourceInfo.PChannel, targetInfo.CollectionName, collectionID, generateMsgPack) dropCollectionLog.Info("has generate msg for dropped collection") return struct{}{}, nil }) @@ -1002,6 +1005,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection return nil } targetInfo.PartitionBarrierChan[partitionID] = util.NewOnceWriteChan(barrierChan) + sourcePChannel := r.collectionNames[collectionName].PChannel partitionLog.Info("add partition info done") r.recordLock.Unlock() @@ -1047,7 +1051,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection }, }, } - r.generatePackChan <- api.GetReplicateMsg(targetInfo.CollectionName, collectionID, generateMsgPack) + r.generatePackChan <- api.GetReplicateMsg(sourcePChannel, collectionName, collectionID, generateMsgPack) partitionLog.Info("has generate msg for dropped partition") return struct{}{}, nil }) @@ -1149,6 +1153,7 @@ func (r *replicateChannelHandler) innerHandleReplicateMsg(forward bool, msg *api } p.CollectionID = msg.CollectionID p.CollectionName = msg.CollectionName + p.PChannelName = msg.PChannelName GetTSManager().SendTargetMsg(r.targetPChannel, p) } @@ -1232,7 +1237,7 @@ func (r *replicateChannelHandler) getCollectionTargetInfo(collectionID int64) (* func (r *replicateChannelHandler) containCollection(collectionName string) bool { r.recordLock.RLock() defer r.recordLock.RUnlock() - return r.collectionNames[collectionName] != 0 + return r.collectionNames[collectionName] != nil } func (r *replicateChannelHandler) getPartitionID(sourceCollectionID, sourcePartitionID int64, info *model.TargetCollectionInfo, name string) (int64, error) { @@ -1340,6 +1345,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa sourceCollectionID := int64(-1) sourceCollectionName := "" forwardChannel := "" + streamPChannel := "" for _, msg := range pack.Msgs { if forward { @@ -1507,6 +1513,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa } originPosition := msg.Position() originPositionPChannel := funcutil.ToPhysicalChannel(originPosition.GetChannelName()) + streamPChannel = originPositionPChannel positionChannel := info.PChannel if IsVirtualChannel(originPosition.GetChannelName()) { positionChannel = info.VChannel @@ -1544,7 +1551,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa } if forwardChannel != "" { - r.forwardMsgFunc(forwardChannel, api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack)) + r.forwardMsgFunc(forwardChannel, api.GetReplicateMsg(streamPChannel, sourceCollectionName, sourceCollectionID, newPack)) return api.EmptyMsgPack } @@ -1583,7 +1590,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa resetLastTs := needTsMsg needTsMsg = needTsMsg || len(newPack.Msgs) == 0 if !needTsMsg { - return api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack) + return api.GetReplicateMsg("", sourceCollectionName, sourceCollectionID, newPack) } timeTickResult := &msgpb.TimeTickMsg{ Base: commonpbutil.NewMsgBase( @@ -1608,7 +1615,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa msgTime, _ := tsoutil.ParseHybridTs(generateTS) TSMetricVec.WithLabelValues(r.targetPChannel).Set(float64(msgTime)) r.ttRateLog.Debug("time tick msg", zap.String("channel", r.targetPChannel), zap.Uint64("max_ts", generateTS)) - return api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack) + return api.GetReplicateMsg("", sourceCollectionName, sourceCollectionID, newPack) } func resetMsgPackTimestamp(pack *msgstream.MsgPack, newTimestamp uint64) bool { @@ -1757,7 +1764,7 @@ func initReplicateChannelHandler(ctx context.Context, metaOp: metaOp, streamCreator: streamCreator, collectionRecords: make(map[int64]*model.TargetCollectionInfo), - collectionNames: make(map[string]int64), + collectionNames: make(map[string]*model.HandlerCollectionInfo), closeStreamFuncs: make(map[int64]io.Closer), apiEventChan: apiEventChan, forwardPackChan: make(chan *api.ReplicateMsg, opts.MessageBufferSize), diff --git a/server/cdc_impl.go b/server/cdc_impl.go index e686679b..78c2866b 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -958,23 +958,24 @@ func (e *MetaCDC) startReplicateDMLMsg(replicateCtx context.Context, info *meta. _ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, invalid collection name or id", []meta.TaskState{}) return } - pChannel := msgPack.EndPositions[0].GetChannelName() - if cdcreader.IsVirtualChannel(pChannel) { - pChannel = funcutil.ToPhysicalChannel(pChannel) + streamChannelName := replicateMsg.PChannelName + targetPChannel := msgPack.EndPositions[0].GetChannelName() + if cdcreader.IsVirtualChannel(targetPChannel) { + targetPChannel = funcutil.ToPhysicalChannel(targetPChannel) } - position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, pChannel, msgPack) + position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, targetPChannel, msgPack) if err != nil { taskLog.Warn("fail to handle the replicate message", zap.Any("pack", msgPack), zap.Error(err)) _ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, err:"+err.Error(), []meta.TaskState{}) return } msgTime, _ := tsoutil.ParseHybridTs(msgPack.EndTs) - replicateMetric(info, channelName, msgPack, metrics.OPTypeWrite) + replicateMetric(info, streamChannelName, msgPack, metrics.OPTypeWrite) metaPosition := &meta.PositionInfo{ Time: msgTime, DataPair: &commonpb.KeyDataPair{ - Key: channelName, + Key: streamChannelName, Data: position, }, } @@ -986,14 +987,14 @@ func (e *MetaCDC) startReplicateDMLMsg(replicateCtx context.Context, info *meta. metaTargetPosition := &meta.PositionInfo{ Time: msgTime, DataPair: &commonpb.KeyDataPair{ - Key: pChannel, + Key: targetPChannel, Data: targetPosition, }, } if position != nil { msgCollectionName := replicateMsg.CollectionName msgCollectionID := replicateMsg.CollectionID - err = writeCallback.UpdateTaskCollectionPosition(msgCollectionID, msgCollectionName, channelName, + err = writeCallback.UpdateTaskCollectionPosition(msgCollectionID, msgCollectionName, streamChannelName, metaPosition, metaOpPosition, metaTargetPosition) if err != nil { log.Warn("fail to update the collection position", zap.Any("pack", msgPack), zap.Error(err)) diff --git a/server/writer_callback.go b/server/writer_callback.go index 3cf63788..c8711095 100644 --- a/server/writer_callback.go +++ b/server/writer_callback.go @@ -61,7 +61,7 @@ func (w *WriteCallback) UpdateTaskCollectionPosition(collectionID int64, collect if err != nil { w.log.Warn("fail to update the collection position", zap.Int64("collection_id", collectionID), - zap.String("vchannel_name", pChannelName), + zap.String("pchannel_name", pChannelName), zap.String("position", util.Base64JSON(position)), zap.Error(err)) return err