Skip to content

Commit

Permalink
improve the db usage way in the create request
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Oct 22, 2024
1 parent 949cb3b commit a1c2d45
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 103 deletions.
73 changes: 34 additions & 39 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,13 @@ jobs:
"extra_info": {
"enable_user_role": true
},
"database_info": {
"name": "*"
},
"collection_infos": [
{
"name": "*"
}
]
"db_collections": {
"*": [
{
"name": "*"
}
]
}
}
}'
Expand Down Expand Up @@ -262,14 +261,13 @@ jobs:
"extra_info": {
"enable_user_role": true
},
"database_info": {
"name": "*"
},
"collection_infos": [
{
"name": "*"
}
]
"db_collections": {
"*": [
{
"name": "*"
}
]
}
}
}'
Expand Down Expand Up @@ -405,14 +403,13 @@ jobs:
"extra_info": {
"enable_user_role": true
},
"database_info": {
"name": "*"
},
"collection_infos": [
{
"name": "*"
}
]
"db_collections": {
"*": [
{
"name": "*"
}
]
}
}
}'
Expand Down Expand Up @@ -548,14 +545,13 @@ jobs:
"extra_info": {
"enable_user_role": true
},
"database_info": {
"name": "*"
},
"collection_infos": [
{
"name": "*"
}
]
"db_collections": {
"*": [
{
"name": "*"
}
]
}
}
}'
Expand Down Expand Up @@ -689,13 +685,12 @@ jobs:
"extra_info": {
"enable_user_role": true
},
"collection_infos": [
{
"name": "*"
}
],
"database_info": {
"name": "foo"
"db_collections": {
"foo": [
{
"name": "*"
}
]
}
}
}'
Expand Down
143 changes: 94 additions & 49 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,6 @@ func getTaskUniqueIDFromReq(req *request.CreateRequest) string {
panic("fail to get the task unique id")
}

func getDatabaseName(i any) string {
switch r := i.(type) {
case *meta.TaskInfo:
if r.DatabaseInfo.Name != "" {
return r.DatabaseInfo.Name
}
return cdcreader.DefaultDatabase
case *request.CreateRequest:
if r.DatabaseInfo.Name != "" {
return r.DatabaseInfo.Name
}
return cdcreader.DefaultDatabase
default:
panic("invalid type")
}
}

func getFullCollectionName(collectionName string, databaseName string) string {
return fmt.Sprintf("%s.%s", databaseName, collectionName)
}
Expand Down Expand Up @@ -338,10 +321,20 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
return nil, err
}
uKey := getTaskUniqueIDFromReq(req)
databaseName := getDatabaseName(req)
newCollectionNames := lo.Map(req.CollectionInfos, func(t model.CollectionInfo, _ int) string {
return getFullCollectionName(t.Name, databaseName)
})
var newCollectionNames []string
if len(req.CollectionInfos) > 0 {
newCollectionNames = lo.Map(req.CollectionInfos, func(t model.CollectionInfo, _ int) string {
return getFullCollectionName(t.Name, cdcreader.DefaultDatabase)
})
}
if len(req.DBCollections) > 0 {
for db, infos := range req.DBCollections {
for _, t := range infos {
newCollectionNames = append(newCollectionNames, getFullCollectionName(t.Name, db))
}
}
}

excludeCollectionNames, err := e.checkDuplicateCollection(uKey, newCollectionNames, req.ExtraInfo)
if err != nil {
return nil, err
Expand All @@ -367,8 +360,8 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
TaskID: e.getUUID(),
MilvusConnectParam: req.MilvusConnectParam,
KafkaConnectParam: req.KafkaConnectParam,
DatabaseInfo: req.DatabaseInfo,
CollectionInfos: req.CollectionInfos,
DBCollections: req.DBCollections,
RPCRequestChannelInfo: req.RPCChannelInfo,
ExtraInfo: req.ExtraInfo,
ExcludeCollections: excludeCollectionNames,
Expand Down Expand Up @@ -517,9 +510,33 @@ func (e *MetaCDC) validCreateRequest(req *request.CreateRequest) error {
return servererror.NewClientError("the cache size is less zero")
}

if err := e.checkCollectionInfos(req.CollectionInfos); err != nil {
if len(req.CollectionInfos) == 0 && len(req.DBCollections) == 0 {
return servererror.NewClientError("the collection info is empty")
}
if len(req.CollectionInfos) > 1 || len(req.DBCollections) > 1 {
return servererror.NewClientError("the collection info should be only one")
}
if len(req.CollectionInfos) == 1 && len(req.DBCollections) == 1 {
return servererror.NewClientError("the collection info and db collection info should be only one")
}
var err error
if len(req.CollectionInfos) == 1 {
err = e.checkCollectionInfos(req.CollectionInfos)
} else if len(req.DBCollections) == 1 {
for db, infos := range req.DBCollections {
if len(db) > e.config.MaxNameLength {
return servererror.NewClientError(fmt.Sprintf("the db name length exceeds %d characters, %s", e.config.MaxNameLength, db))
}
err = e.checkCollectionInfos(infos)
if err != nil {
break
}
}
}
if err != nil {
return err
}

if req.RPCChannelInfo.Name != "" && req.RPCChannelInfo.Name != e.config.SourceConfig.ReplicateChan {
return servererror.NewClientError("the rpc channel is invalid, the channel name should be the same as the source config")
}
Expand Down Expand Up @@ -557,11 +574,6 @@ func (e *MetaCDC) checkCollectionInfos(infos []model.CollectionInfo) error {
return servererror.NewClientError("empty collection info")
}

// if len(infos) != 1 || infos[0].Name != cdcreader.AllCollection {
// return servererror.NewClientError("the collection info should be only one, and the collection name should be `*`. Specifying collection name will be supported in the future.")
// }
// return nil

if len(infos) != 1 {
return servererror.NewClientError("the collection info should be only one.")
}
Expand Down Expand Up @@ -1016,10 +1028,10 @@ func replicateMetric(info *meta.TaskInfo, channelName string, msgPack *msgstream

func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *ReplicateEntity, channelName, channelPosition string) (api.Reader, error) {
taskLog := log.With(zap.String("task_id", info.TaskID))
collectionName := info.CollectionNames()[0]
databaseName := getDatabaseName(info)
isAnyCollection := collectionName == cdcreader.AllCollection
isAnyDatabase := databaseName == cdcreader.AllDatabase
// collectionName := info.CollectionNames()[0]
// databaseName := getDatabaseName(info)
// isAnyCollection := collectionName == cdcreader.AllCollection
// isAnyDatabase := databaseName == cdcreader.AllDatabase
// isTmpCollection := collectionName == model.TmpCollectionName

dataHandleFunc := func(funcCtx context.Context, pack *msgstream.MsgPack) bool {
Expand All @@ -1045,10 +1057,15 @@ func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *Replica
if extraSkip {
return true
}
} else if (!isAnyCollection && msgCollectionName != collectionName) ||
(!isAnyDatabase && msgDatabaseName != databaseName) {
// skip the message if the collection name is not equal to the task collection name
return true
} else {
// skip the msg when db or collection name is not matched
collectionInfos := GetCollectionInfos(info, msgDatabaseName)
if collectionInfos == nil {
return true
}
if msgCollectionName != "" && !MatchCollection(info, collectionInfos, msgDatabaseName, msgCollectionName) {
return true
}
}

positionBytes, err := replicateEntity.writerObj.HandleOpMessagePack(funcCtx, pack)
Expand Down Expand Up @@ -1328,25 +1345,53 @@ func (e *MetaCDC) Maintenance(req *request.MaintenanceRequest) (*request.Mainten
}

func GetShouldReadFunc(taskInfo *meta.TaskInfo) cdcreader.ShouldReadFunc {
isAllCollection := taskInfo.CollectionInfos[0].Name == cdcreader.AllCollection
databaseName := getDatabaseName(taskInfo)
isAllDataBase := databaseName == cdcreader.AllDatabase
return func(databaseInfo *coremodel.DatabaseInfo, collectionInfo *pb.CollectionInfo) bool {
currentCollectionName := collectionInfo.Schema.Name
if databaseInfo.Dropped {
log.Info("database is dropped", zap.String("database", databaseInfo.Name), zap.String("collection", currentCollectionName))
return false
}
taskCollectionInfos := GetCollectionInfos(taskInfo, databaseInfo.Name)
if taskCollectionInfos == nil {
return false
}
return MatchCollection(taskInfo, taskCollectionInfos, databaseInfo.Name, currentCollectionName)
}
}

notStarContains := !isAllCollection && lo.ContainsBy(taskInfo.CollectionInfos, func(taskCollectionInfo model.CollectionInfo) bool {
return taskCollectionInfo.Name == currentCollectionName
})
starContains := isAllCollection && !lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool {
return s == currentCollectionName
})
dbMatch := isAllDataBase ||
taskInfo.DatabaseInfo.Name == databaseInfo.Name

return (notStarContains || starContains) && dbMatch
func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string) []model.CollectionInfo {
var taskCollectionInfos []model.CollectionInfo
if len(taskInfo.CollectionInfos) > 0 {
if dbName != cdcreader.DefaultDatabase {
return nil
}
taskCollectionInfos = taskInfo.CollectionInfos
}
if len(taskInfo.DBCollections) > 0 {
taskCollectionInfos = taskInfo.DBCollections[dbName]
if taskCollectionInfos == nil {
isExclude := lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool {
db, _ := getCollectionNameFromFull(s)
return db == dbName
})
if isExclude {
return nil
}
taskCollectionInfos = taskInfo.DBCollections[cdcreader.AllDatabase]
}
}
return taskCollectionInfos
}

func MatchCollection(taskInfo *meta.TaskInfo, taskCollectionInfos []model.CollectionInfo, currentDatabaseName, currentCollectionName string) bool {
isAllCollection := taskCollectionInfos[0].Name == cdcreader.AllCollection

notStarContains := !isAllCollection && lo.ContainsBy(taskCollectionInfos, func(taskCollectionInfo model.CollectionInfo) bool {
return taskCollectionInfo.Name == currentCollectionName
})
starContains := isAllCollection && !lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool {
match, _ := matchCollectionName(s, getFullCollectionName(currentCollectionName, currentDatabaseName))
return match
})
return notStarContains || starContains
}
25 changes: 18 additions & 7 deletions server/cdc_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,18 +761,22 @@ func TestShouldReadCollection(t *testing.T) {
Name: "*",
},
},
ExcludeCollections: []string{"foo"},
ExcludeCollections: []string{"default.foo"},
})
assert.True(t, f(
&coremodel.DatabaseInfo{},
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "hoo",
},
}))

assert.False(t, f(
&coremodel.DatabaseInfo{},
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "foo",
Expand All @@ -781,6 +785,7 @@ func TestShouldReadCollection(t *testing.T) {

assert.False(t, f(
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
Dropped: true,
},
&pb.CollectionInfo{
Expand All @@ -800,24 +805,30 @@ func TestShouldReadCollection(t *testing.T) {
Name: "b",
},
},
ExcludeCollections: []string{"foo"},
ExcludeCollections: []string{"default.foo"},
})
assert.True(t, f(
&coremodel.DatabaseInfo{},
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "a",
},
}))
assert.False(t, f(
&coremodel.DatabaseInfo{},
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "c",
},
}))
assert.False(t, f(
&coremodel.DatabaseInfo{},
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "foo",
Expand Down
2 changes: 1 addition & 1 deletion server/model/meta/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type TaskInfo struct {
KafkaConnectParam model.KafkaConnectParam
WriterCacheConfig model.BufferConfig
CollectionInfos []model.CollectionInfo
DatabaseInfo model.DatabaseInfo
DBCollections map[string][]model.CollectionInfo
RPCRequestChannelInfo model.ChannelInfo
ExtraInfo model.ExtraInfo
ExcludeCollections []string // it's used for the `*` collection name
Expand Down
Loading

0 comments on commit a1c2d45

Please sign in to comment.