Skip to content

Commit

Permalink
use the source channel name in the task position
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Oct 22, 2024
1 parent 5992c40 commit 901b16b
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 32 deletions.
6 changes: 4 additions & 2 deletions core/api/replicate_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import "github.com/milvus-io/milvus/pkg/mq/msgstream"
var EmptyMsgPack = &ReplicateMsg{}

type ReplicateMsg struct {
// source collection info
// source collection and channel info
CollectionName string
CollectionID int64
PChannelName string
MsgPack *msgstream.MsgPack
}

func GetReplicateMsg(collectionName string, collectionID int64, msgPack *msgstream.MsgPack) *ReplicateMsg {
func GetReplicateMsg(pchannelName string, collectionName string, collectionID int64, msgPack *msgstream.MsgPack) *ReplicateMsg {
return &ReplicateMsg{
CollectionName: collectionName,
CollectionID: collectionID,
PChannelName: pchannelName,
MsgPack: msgPack,
}
}
6 changes: 6 additions & 0 deletions core/model/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type SourceCollectionInfo struct {
ShardNum int
}

// source collection info in the handler
type HandlerCollectionInfo struct {
CollectionID int64
PChannel string
}

type TargetCollectionInfo struct {
DatabaseName string
CollectionID int64
Expand Down
10 changes: 0 additions & 10 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,6 @@ func NewCollectionReader(id string,
errChan: make(chan error),
retryOptions: util.GetRetryOptions(readerConfig.Retry),
}
// for _, collectionPositions := range seekPosition {
// for channel, msgPosition := range collectionPositions {
// pchannel := channel
// if IsVirtualChannel(pchannel) {
// pchannel = funcutil.ToPhysicalChannel(pchannel)
// }
// // TODO how to use the target channel
// GetTSManager().CollectTS(pchannel, msgPosition.GetTimestamp())
// }
// }
return reader, nil
}

Expand Down
29 changes: 18 additions & 11 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,8 @@ type replicateChannelHandler struct {

// key: source milvus collectionID value: *model.TargetCollectionInfo
recordLock deadlock.RWMutex
collectionRecords map[int64]*model.TargetCollectionInfo // key is suorce collection id
collectionNames map[string]int64
collectionRecords map[int64]*model.TargetCollectionInfo // key is suorce collection id
collectionNames map[string]*model.HandlerCollectionInfo // key is collection name, value is the source brief collection info
closeStreamFuncs map[int64]io.Closer

forwardPackChan chan *api.ReplicateMsg
Expand Down Expand Up @@ -878,7 +878,10 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti
}
r.recordLock.Lock()
r.collectionRecords[collectionID] = targetInfo
r.collectionNames[targetInfo.CollectionName] = collectionID
r.collectionNames[targetInfo.CollectionName] = &model.HandlerCollectionInfo{
CollectionID: collectionID,
PChannel: sourceInfo.PChannel,
}
r.closeStreamFuncs[collectionID] = closeStreamFunc
go func() {
log.Info("start to handle the msg pack", zap.String("channel_name", sourceInfo.VChannel))
Expand All @@ -893,7 +896,7 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti
return
}

r.innerHandleReplicateMsg(false, api.GetReplicateMsg(targetInfo.CollectionName, collectionID, msgPack))
r.innerHandleReplicateMsg(false, api.GetReplicateMsg(sourceInfo.PChannel, targetInfo.CollectionName, collectionID, msgPack))
}
}
}()
Expand Down Expand Up @@ -941,7 +944,7 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti
},
},
}
r.generatePackChan <- api.GetReplicateMsg(targetInfo.CollectionName, collectionID, generateMsgPack)
r.generatePackChan <- api.GetReplicateMsg(sourceInfo.PChannel, targetInfo.CollectionName, collectionID, generateMsgPack)
dropCollectionLog.Info("has generate msg for dropped collection")
return struct{}{}, nil
})
Expand Down Expand Up @@ -1002,6 +1005,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection
return nil
}
targetInfo.PartitionBarrierChan[partitionID] = util.NewOnceWriteChan(barrierChan)
sourcePChannel := r.collectionNames[collectionName].PChannel
partitionLog.Info("add partition info done")
r.recordLock.Unlock()

Expand Down Expand Up @@ -1047,7 +1051,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection
},
},
}
r.generatePackChan <- api.GetReplicateMsg(targetInfo.CollectionName, collectionID, generateMsgPack)
r.generatePackChan <- api.GetReplicateMsg(sourcePChannel, collectionName, collectionID, generateMsgPack)
partitionLog.Info("has generate msg for dropped partition")
return struct{}{}, nil
})
Expand Down Expand Up @@ -1149,6 +1153,7 @@ func (r *replicateChannelHandler) innerHandleReplicateMsg(forward bool, msg *api
}
p.CollectionID = msg.CollectionID
p.CollectionName = msg.CollectionName
p.PChannelName = msg.PChannelName
GetTSManager().SendTargetMsg(r.targetPChannel, p)
}

Expand Down Expand Up @@ -1232,7 +1237,7 @@ func (r *replicateChannelHandler) getCollectionTargetInfo(collectionID int64) (*
func (r *replicateChannelHandler) containCollection(collectionName string) bool {
r.recordLock.RLock()
defer r.recordLock.RUnlock()
return r.collectionNames[collectionName] != 0
return r.collectionNames[collectionName] != nil
}

func (r *replicateChannelHandler) getPartitionID(sourceCollectionID, sourcePartitionID int64, info *model.TargetCollectionInfo, name string) (int64, error) {
Expand Down Expand Up @@ -1340,6 +1345,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
sourceCollectionID := int64(-1)
sourceCollectionName := ""
forwardChannel := ""
streamPChannel := ""

for _, msg := range pack.Msgs {
if forward {
Expand Down Expand Up @@ -1507,6 +1513,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}
originPosition := msg.Position()
originPositionPChannel := funcutil.ToPhysicalChannel(originPosition.GetChannelName())
streamPChannel = originPositionPChannel
positionChannel := info.PChannel
if IsVirtualChannel(originPosition.GetChannelName()) {
positionChannel = info.VChannel
Expand Down Expand Up @@ -1544,7 +1551,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}

if forwardChannel != "" {
r.forwardMsgFunc(forwardChannel, api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack))
r.forwardMsgFunc(forwardChannel, api.GetReplicateMsg(streamPChannel, sourceCollectionName, sourceCollectionID, newPack))
return api.EmptyMsgPack
}

Expand Down Expand Up @@ -1583,7 +1590,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
resetLastTs := needTsMsg
needTsMsg = needTsMsg || len(newPack.Msgs) == 0
if !needTsMsg {
return api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack)
return api.GetReplicateMsg("", sourceCollectionName, sourceCollectionID, newPack)
}
timeTickResult := &msgpb.TimeTickMsg{
Base: commonpbutil.NewMsgBase(
Expand All @@ -1608,7 +1615,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
msgTime, _ := tsoutil.ParseHybridTs(generateTS)
TSMetricVec.WithLabelValues(r.targetPChannel).Set(float64(msgTime))
r.ttRateLog.Debug("time tick msg", zap.String("channel", r.targetPChannel), zap.Uint64("max_ts", generateTS))
return api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack)
return api.GetReplicateMsg("", sourceCollectionName, sourceCollectionID, newPack)
}

func resetMsgPackTimestamp(pack *msgstream.MsgPack, newTimestamp uint64) bool {
Expand Down Expand Up @@ -1757,7 +1764,7 @@ func initReplicateChannelHandler(ctx context.Context,
metaOp: metaOp,
streamCreator: streamCreator,
collectionRecords: make(map[int64]*model.TargetCollectionInfo),
collectionNames: make(map[string]int64),
collectionNames: make(map[string]*model.HandlerCollectionInfo),
closeStreamFuncs: make(map[int64]io.Closer),
apiEventChan: apiEventChan,
forwardPackChan: make(chan *api.ReplicateMsg, opts.MessageBufferSize),
Expand Down
17 changes: 9 additions & 8 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,23 +958,24 @@ func (e *MetaCDC) startReplicateDMLMsg(replicateCtx context.Context, info *meta.
_ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, invalid collection name or id", []meta.TaskState{})
return
}
pChannel := msgPack.EndPositions[0].GetChannelName()
if cdcreader.IsVirtualChannel(pChannel) {
pChannel = funcutil.ToPhysicalChannel(pChannel)
streamChannelName := replicateMsg.PChannelName
targetPChannel := msgPack.EndPositions[0].GetChannelName()
if cdcreader.IsVirtualChannel(targetPChannel) {
targetPChannel = funcutil.ToPhysicalChannel(targetPChannel)
}
position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, pChannel, msgPack)
position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, targetPChannel, msgPack)
if err != nil {
taskLog.Warn("fail to handle the replicate message", zap.Any("pack", msgPack), zap.Error(err))
_ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, err:"+err.Error(), []meta.TaskState{})
return
}
msgTime, _ := tsoutil.ParseHybridTs(msgPack.EndTs)
replicateMetric(info, channelName, msgPack, metrics.OPTypeWrite)
replicateMetric(info, streamChannelName, msgPack, metrics.OPTypeWrite)

metaPosition := &meta.PositionInfo{
Time: msgTime,
DataPair: &commonpb.KeyDataPair{
Key: channelName,
Key: streamChannelName,
Data: position,
},
}
Expand All @@ -986,14 +987,14 @@ func (e *MetaCDC) startReplicateDMLMsg(replicateCtx context.Context, info *meta.
metaTargetPosition := &meta.PositionInfo{
Time: msgTime,
DataPair: &commonpb.KeyDataPair{
Key: pChannel,
Key: targetPChannel,
Data: targetPosition,
},
}
if position != nil {
msgCollectionName := replicateMsg.CollectionName
msgCollectionID := replicateMsg.CollectionID
err = writeCallback.UpdateTaskCollectionPosition(msgCollectionID, msgCollectionName, channelName,
err = writeCallback.UpdateTaskCollectionPosition(msgCollectionID, msgCollectionName, streamChannelName,
metaPosition, metaOpPosition, metaTargetPosition)
if err != nil {
log.Warn("fail to update the collection position", zap.Any("pack", msgPack), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion server/writer_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (w *WriteCallback) UpdateTaskCollectionPosition(collectionID int64, collect
if err != nil {
w.log.Warn("fail to update the collection position",
zap.Int64("collection_id", collectionID),
zap.String("vchannel_name", pChannelName),
zap.String("pchannel_name", pChannelName),
zap.String("position", util.Base64JSON(position)),
zap.Error(err))
return err
Expand Down

0 comments on commit 901b16b

Please sign in to comment.