Skip to content

Commit

Permalink
feat: support to replicate rbac message to kafka (#122)
Browse files Browse the repository at this point in the history
* feat: support to replicate rbac message to kafka

* fix: golangci-lint check

* feat: send msg with its type

* fix: kafkaDatahandler dont impl api.Datahandler

* add: use uKey with replicateEntity

* add: dont use targetClient when downstream is kafka

* add: check kafkaConnectParam

* ci: add kafka deployment in unit.yaml

* ci: modify unit.yaml

* ci: modify unit.yaml

* tets: wait for kafka connection before test

* test: add createRequest with kafka test

* revert the unrelated changes

* fix: modify StartReadCollection func

* ci: fix golangci-lint check

* add: getPartitionInfo when downstream is kafka

* ci: fix unit test

* fix: nil pointer error
  • Loading branch information
Ricky-chen1 authored Sep 23, 2024
1 parent 51da848 commit 63180d1
Show file tree
Hide file tree
Showing 14 changed files with 922 additions and 214 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ jobs:
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
ports:
- 2379:2379
kafka:
image: apache/kafka:3.8.0
ports:
- 9092:9092
# pulsar:
# image: apachepulsar/pulsar:2.8.2
# ports:
Expand Down
49 changes: 29 additions & 20 deletions core/reader/etcd_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ func (e *EtcdOp) GetAllDroppedObj() map[string]map[string]uint64 {
return dbName
}

var dbName string
for _, collection := range collections {
collectionName := collection.Schema.Name
originDBName := getDBNameForCollection(collection.ID)
Expand All @@ -822,18 +823,23 @@ func (e *EtcdOp) GetAllDroppedObj() map[string]map[string]uint64 {
continue
}
// maybe the database has been drop, so get the database name from the target
dbName, err := e.targetMilvus.GetDatabaseName(context.Background(), collectionName, originDBName)
if IsDatabaseNotFoundError(err) {
log.Info("the collection info has been dropped in the source and target", zap.String("collection_name", collectionName))
continue
}
if err != nil {
log.Panic("fail to get database name", zap.String("collection_name", collectionName), zap.Error(err))
continue
}
if originDBName != dbName {
_, dropDBKey := util.GetDBInfoKeys(dbName)
res[droppedDatabaseKey][dropDBKey] = tt - 1
// targetMilvus is nil when downstream is not milvus
if e.targetMilvus != nil {
dbName, err = e.targetMilvus.GetDatabaseName(context.Background(), collectionName, originDBName)
if IsDatabaseNotFoundError(err) {
log.Info("the collection info has been dropped in the source and target", zap.String("collection_name", collectionName))
continue
}
if err != nil {
log.Panic("fail to get database name", zap.String("collection_name", collectionName), zap.Error(err))
continue
}
if originDBName != dbName {
_, dropDBKey := util.GetDBInfoKeys(dbName)
res[droppedDatabaseKey][dropDBKey] = tt - 1
}
} else {
dbName = originDBName
}

_, dropKey := util.GetCollectionInfoKeys(collectionName, dbName)
Expand All @@ -856,14 +862,17 @@ func (e *EtcdOp) GetAllDroppedObj() map[string]map[string]uint64 {
log.Panic("fail to get db name for collection", zap.Int64("collection_id", partition.CollectionId))
continue
}
dbName, err := e.targetMilvus.GetDatabaseName(context.Background(), collectionName, originDBName)
if IsDatabaseNotFoundError(err) {
log.Info("the collection info has been dropped in the source and target", zap.String("collection_name", collectionName))
continue
}
if err != nil {
log.Panic("fail to get database name", zap.String("collection_name", collectionName), zap.Error(err))
continue
// targetMilvus is nil when downstream is not milvus
if e.targetMilvus != nil {
dbName, err = e.targetMilvus.GetDatabaseName(context.Background(), collectionName, originDBName)
if IsDatabaseNotFoundError(err) {
log.Info("the collection info has been dropped in the source and target", zap.String("collection_name", collectionName))
continue
}
if err != nil {
log.Panic("fail to get database name", zap.String("collection_name", collectionName), zap.Error(err))
continue
}
}
partitionName := partition.PartitionName
_, dropKey := util.GetPartitionInfoKeys(partitionName, collectionName, dbName)
Expand Down
Loading

0 comments on commit 63180d1

Please sign in to comment.