Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use the source channel name in the task position #147

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/api/replicate_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import "github.com/milvus-io/milvus/pkg/mq/msgstream"
var EmptyMsgPack = &ReplicateMsg{}

type ReplicateMsg struct {
// source collection info
// source collection and channel info
CollectionName string
CollectionID int64
PChannelName string
MsgPack *msgstream.MsgPack
}

func GetReplicateMsg(collectionName string, collectionID int64, msgPack *msgstream.MsgPack) *ReplicateMsg {
func GetReplicateMsg(pchannelName string, collectionName string, collectionID int64, msgPack *msgstream.MsgPack) *ReplicateMsg {
return &ReplicateMsg{
CollectionName: collectionName,
CollectionID: collectionID,
PChannelName: pchannelName,
MsgPack: msgPack,
}
}
6 changes: 6 additions & 0 deletions core/model/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type SourceCollectionInfo struct {
ShardNum int
}

// source collection info in the handler
type HandlerCollectionInfo struct {
CollectionID int64
PChannel string
}

type TargetCollectionInfo struct {
DatabaseName string
CollectionID int64
Expand Down
10 changes: 0 additions & 10 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,6 @@ func NewCollectionReader(id string,
errChan: make(chan error),
retryOptions: util.GetRetryOptions(readerConfig.Retry),
}
// for _, collectionPositions := range seekPosition {
// for channel, msgPosition := range collectionPositions {
// pchannel := channel
// if IsVirtualChannel(pchannel) {
// pchannel = funcutil.ToPhysicalChannel(pchannel)
// }
// // TODO how to use the target channel
// GetTSManager().CollectTS(pchannel, msgPosition.GetTimestamp())
// }
// }
return reader, nil
}

Expand Down
29 changes: 18 additions & 11 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,8 @@ type replicateChannelHandler struct {

// key: source milvus collectionID value: *model.TargetCollectionInfo
recordLock deadlock.RWMutex
collectionRecords map[int64]*model.TargetCollectionInfo // key is suorce collection id
collectionNames map[string]int64
collectionRecords map[int64]*model.TargetCollectionInfo // key is suorce collection id
collectionNames map[string]*model.HandlerCollectionInfo // key is collection name, value is the source brief collection info
closeStreamFuncs map[int64]io.Closer

forwardPackChan chan *api.ReplicateMsg
Expand Down Expand Up @@ -878,7 +878,10 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti
}
r.recordLock.Lock()
r.collectionRecords[collectionID] = targetInfo
r.collectionNames[targetInfo.CollectionName] = collectionID
r.collectionNames[targetInfo.CollectionName] = &model.HandlerCollectionInfo{
CollectionID: collectionID,
PChannel: sourceInfo.PChannel,
}
r.closeStreamFuncs[collectionID] = closeStreamFunc
go func() {
log.Info("start to handle the msg pack", zap.String("channel_name", sourceInfo.VChannel))
Expand All @@ -893,7 +896,7 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti
return
}

r.innerHandleReplicateMsg(false, api.GetReplicateMsg(targetInfo.CollectionName, collectionID, msgPack))
r.innerHandleReplicateMsg(false, api.GetReplicateMsg(sourceInfo.PChannel, targetInfo.CollectionName, collectionID, msgPack))
}
}
}()
Expand Down Expand Up @@ -941,7 +944,7 @@ func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollecti
},
},
}
r.generatePackChan <- api.GetReplicateMsg(targetInfo.CollectionName, collectionID, generateMsgPack)
r.generatePackChan <- api.GetReplicateMsg(sourceInfo.PChannel, targetInfo.CollectionName, collectionID, generateMsgPack)
dropCollectionLog.Info("has generate msg for dropped collection")
return struct{}{}, nil
})
Expand Down Expand Up @@ -1002,6 +1005,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection
return nil
}
targetInfo.PartitionBarrierChan[partitionID] = util.NewOnceWriteChan(barrierChan)
sourcePChannel := r.collectionNames[collectionName].PChannel
partitionLog.Info("add partition info done")
r.recordLock.Unlock()

Expand Down Expand Up @@ -1047,7 +1051,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection
},
},
}
r.generatePackChan <- api.GetReplicateMsg(targetInfo.CollectionName, collectionID, generateMsgPack)
r.generatePackChan <- api.GetReplicateMsg(sourcePChannel, collectionName, collectionID, generateMsgPack)
partitionLog.Info("has generate msg for dropped partition")
return struct{}{}, nil
})
Expand Down Expand Up @@ -1149,6 +1153,7 @@ func (r *replicateChannelHandler) innerHandleReplicateMsg(forward bool, msg *api
}
p.CollectionID = msg.CollectionID
p.CollectionName = msg.CollectionName
p.PChannelName = msg.PChannelName
GetTSManager().SendTargetMsg(r.targetPChannel, p)
}

Expand Down Expand Up @@ -1232,7 +1237,7 @@ func (r *replicateChannelHandler) getCollectionTargetInfo(collectionID int64) (*
func (r *replicateChannelHandler) containCollection(collectionName string) bool {
r.recordLock.RLock()
defer r.recordLock.RUnlock()
return r.collectionNames[collectionName] != 0
return r.collectionNames[collectionName] != nil
}

func (r *replicateChannelHandler) getPartitionID(sourceCollectionID, sourcePartitionID int64, info *model.TargetCollectionInfo, name string) (int64, error) {
Expand Down Expand Up @@ -1340,6 +1345,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
sourceCollectionID := int64(-1)
sourceCollectionName := ""
forwardChannel := ""
streamPChannel := ""

for _, msg := range pack.Msgs {
if forward {
Expand Down Expand Up @@ -1507,6 +1513,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}
originPosition := msg.Position()
originPositionPChannel := funcutil.ToPhysicalChannel(originPosition.GetChannelName())
streamPChannel = originPositionPChannel
positionChannel := info.PChannel
if IsVirtualChannel(originPosition.GetChannelName()) {
positionChannel = info.VChannel
Expand Down Expand Up @@ -1544,7 +1551,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}

if forwardChannel != "" {
r.forwardMsgFunc(forwardChannel, api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack))
r.forwardMsgFunc(forwardChannel, api.GetReplicateMsg(streamPChannel, sourceCollectionName, sourceCollectionID, newPack))
return api.EmptyMsgPack
}

Expand Down Expand Up @@ -1583,7 +1590,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
resetLastTs := needTsMsg
needTsMsg = needTsMsg || len(newPack.Msgs) == 0
if !needTsMsg {
return api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack)
return api.GetReplicateMsg("", sourceCollectionName, sourceCollectionID, newPack)
}
timeTickResult := &msgpb.TimeTickMsg{
Base: commonpbutil.NewMsgBase(
Expand All @@ -1608,7 +1615,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
msgTime, _ := tsoutil.ParseHybridTs(generateTS)
TSMetricVec.WithLabelValues(r.targetPChannel).Set(float64(msgTime))
r.ttRateLog.Debug("time tick msg", zap.String("channel", r.targetPChannel), zap.Uint64("max_ts", generateTS))
return api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack)
return api.GetReplicateMsg("", sourceCollectionName, sourceCollectionID, newPack)
}

func resetMsgPackTimestamp(pack *msgstream.MsgPack, newTimestamp uint64) bool {
Expand Down Expand Up @@ -1757,7 +1764,7 @@ func initReplicateChannelHandler(ctx context.Context,
metaOp: metaOp,
streamCreator: streamCreator,
collectionRecords: make(map[int64]*model.TargetCollectionInfo),
collectionNames: make(map[string]int64),
collectionNames: make(map[string]*model.HandlerCollectionInfo),
closeStreamFuncs: make(map[int64]io.Closer),
apiEventChan: apiEventChan,
forwardPackChan: make(chan *api.ReplicateMsg, opts.MessageBufferSize),
Expand Down
17 changes: 9 additions & 8 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,23 +958,24 @@ func (e *MetaCDC) startReplicateDMLMsg(replicateCtx context.Context, info *meta.
_ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, invalid collection name or id", []meta.TaskState{})
return
}
pChannel := msgPack.EndPositions[0].GetChannelName()
if cdcreader.IsVirtualChannel(pChannel) {
pChannel = funcutil.ToPhysicalChannel(pChannel)
streamChannelName := replicateMsg.PChannelName
targetPChannel := msgPack.EndPositions[0].GetChannelName()
if cdcreader.IsVirtualChannel(targetPChannel) {
targetPChannel = funcutil.ToPhysicalChannel(targetPChannel)
}
position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, pChannel, msgPack)
position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, targetPChannel, msgPack)
if err != nil {
taskLog.Warn("fail to handle the replicate message", zap.Any("pack", msgPack), zap.Error(err))
_ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, err:"+err.Error(), []meta.TaskState{})
return
}
msgTime, _ := tsoutil.ParseHybridTs(msgPack.EndTs)
replicateMetric(info, channelName, msgPack, metrics.OPTypeWrite)
replicateMetric(info, streamChannelName, msgPack, metrics.OPTypeWrite)

metaPosition := &meta.PositionInfo{
Time: msgTime,
DataPair: &commonpb.KeyDataPair{
Key: channelName,
Key: streamChannelName,
Data: position,
},
}
Expand All @@ -986,14 +987,14 @@ func (e *MetaCDC) startReplicateDMLMsg(replicateCtx context.Context, info *meta.
metaTargetPosition := &meta.PositionInfo{
Time: msgTime,
DataPair: &commonpb.KeyDataPair{
Key: pChannel,
Key: targetPChannel,
Data: targetPosition,
},
}
if position != nil {
msgCollectionName := replicateMsg.CollectionName
msgCollectionID := replicateMsg.CollectionID
err = writeCallback.UpdateTaskCollectionPosition(msgCollectionID, msgCollectionName, channelName,
err = writeCallback.UpdateTaskCollectionPosition(msgCollectionID, msgCollectionName, streamChannelName,
metaPosition, metaOpPosition, metaTargetPosition)
if err != nil {
log.Warn("fail to update the collection position", zap.Any("pack", msgPack), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion server/writer_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (w *WriteCallback) UpdateTaskCollectionPosition(collectionID int64, collect
if err != nil {
w.log.Warn("fail to update the collection position",
zap.Int64("collection_id", collectionID),
zap.String("vchannel_name", pChannelName),
zap.String("pchannel_name", pChannelName),
zap.String("position", util.Base64JSON(position)),
zap.Error(err))
return err
Expand Down
Loading