Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix 2.4.8 #396

Merged
merged 13 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
2020-07-29 Alibaba Cloud.
* version: 2.4.8
* BUGFIX: fix bug that checkpoint.storage.db does not work since v2.4.6.
* BUGFIX: fix bug of orphan document filter core. #389.
* IMPROVE: remove listDatabase request when incr_sync.shard_key != auto.
#385.
* IMPROVE: add oplog_max_size and oplog_avg_size metric in restful: "repl/".
* BUGFIX: fix bug of checkpoint duplicate updating when HA switch.
* IMPROVE: increase syncer thread when fetching method is change stream.
* IMPROVE: add readConcern and writeConcern in client to solve the
orphan document problem reading from MongoS. #392
* IMPROVE: fix wrong name usage in mongoshake-stat script, thanks @Neal
Gosalia. #393.

2020-06-30 Alibaba Cloud.
* version: 2.4.7
* IMPROVE: add exit-point and safe shutdown mechanism. #375, #162
Expand Down
6 changes: 2 additions & 4 deletions conf/collector.conf
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,8 @@ filter.ddl_enable = false
# checkpoint info, used in resuming from break point.
# checkpoint存储信息,用于支持断点续传。
# context.storage.url is used to mark the checkpoint store database. E.g., mongodb://127.0.0.1:20070
# if not set, checkpoint will be written into source mongodb when source mongodb is replica-set(db=mongoshake),
# when source mongodb is sharding, the checkpoint will be written into config-server(db=admin)
# checkpoint的具体写入的MongoDB地址,如果不配置,对于副本集将写入源库(db=mongoshake),对于分片集
# 将写入config-server(db=admin)
# if not set, checkpoint will be written into source mongodb(db=mongoshake)
# checkpoint的具体写入的MongoDB地址,如果不配置,对于副本集和分片集群都将写入源库(db=mongoshake)
checkpoint.storage.url =
# checkpoint db's name.
# checkpoint存储的db的名字
Expand Down
4 changes: 2 additions & 2 deletions scripts/mongoshake-stat
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __crash(message):


def usage():
print "Usage: ./blsstat [--host=127.0.0.1] [--port=8080] [--tables] [--queue]"
print "Usage: ./mongoshake-stat [--host=127.0.0.1] [--port=8080] [--tables] [--queue]"
exit(0)


Expand Down Expand Up @@ -217,7 +217,7 @@ if __name__ == "__main__":

elif VERBOSE_TABLE:
if module != MODULE_RECEIVER:
print "blsstat --tables not support collector"
print "Command `mongoshake-stat --tables` is not supported by collector"
else:
# show tables operations
#
Expand Down
258 changes: 132 additions & 126 deletions src/mongoshake/collector/batcher_test.go

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions src/mongoshake/collector/ckpt/ckpt_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ func NewCheckpointManager(name string, startPosition int64) *CheckpointManager {
URL: conf.Options.CheckpointStorageCollection,
}
case utils.VarCheckpointStorageDatabase:
db := utils.AppDatabase
/*if conf.Options.IsShardCluster() {
db = utils.VarCheckpointStorageDbShardingDefault
}*/
db := conf.Options.CheckpointStorageDb
newManager.delegate = &MongoCheckpoint{
CheckpointContext: CheckpointContext{
Name: name,
Expand Down
8 changes: 2 additions & 6 deletions src/mongoshake/collector/ckpt/ckpt_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"fmt"

"mongoshake/collector/configure"
"mongoshake/common"

LOG "github.com/vinllen/log4go"
Expand Down Expand Up @@ -67,7 +66,8 @@ type MongoCheckpoint struct {
func (ckpt *MongoCheckpoint) ensureNetwork() bool {
// make connection if we haven't already established one
if ckpt.Conn == nil {
if conn, err := utils.NewMongoConn(ckpt.URL, utils.VarMongoConnectModePrimary, true); err == nil {
if conn, err := utils.NewMongoConn(ckpt.URL, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority); err == nil {
ckpt.Conn = conn
ckpt.QueryHandle = conn.Session.DB(ckpt.DB).C(ckpt.Table)
} else {
Expand All @@ -76,10 +76,6 @@ func (ckpt *MongoCheckpoint) ensureNetwork() bool {
}
}

// set WriteMajority while checkpoint is writing to ConfigServer
if conf.Options.IsShardCluster() {
ckpt.Conn.Session.EnsureSafe(&mgo.Safe{WMode: utils.MajorityWriteConcern})
}
return true
}

Expand Down
21 changes: 16 additions & 5 deletions src/mongoshake/collector/ckpt/ckpt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand All @@ -53,9 +54,11 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -98,9 +101,11 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -134,9 +139,12 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault
utils.FcvCheckpoint.CurrentVersion = 1

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -189,9 +197,12 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault
utils.FcvCheckpoint.CurrentVersion = 1

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down
23 changes: 14 additions & 9 deletions src/mongoshake/collector/coordinator/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func fetchIndexes(sourceList []*utils.MongoSource) (map[utils.NS][]mgo.Index, er
}

// 2. build connection
conn, err := utils.NewMongoConn(src.URL, utils.VarMongoConnectModeSecondaryPreferred, true)
conn, err := utils.NewMongoConn(src.URL, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault)
if err != nil {
return nil, fmt.Errorf("source[%v %v] build connection failed: %v", src.ReplicaName, src.URL, err)
}
Expand Down Expand Up @@ -109,12 +110,7 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
// the source is sharding or replica-set
// fromIsSharding := len(coordinator.Sources) > 1 || fromConn0.IsMongos()

fromIsSharding := false
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
fromIsSharding = coordinator.MongoS != nil
} else {
fromIsSharding = len(conf.Options.MongoUrls) > 1
}
fromIsSharding := coordinator.SourceIsSharding()

var shardingChunkMap sharding.ShardingChunkMap
var err error
Expand Down Expand Up @@ -146,7 +142,8 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
// create target client
toUrl := conf.Options.TunnelAddress[0]
var toConn *utils.MongoConn
if toConn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, true); err != nil {
if toConn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault); err != nil {
return err
}
defer toConn.Close()
Expand Down Expand Up @@ -206,7 +203,7 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
orphanFilter = filter.NewOrphanFilter(src.ReplicaName, dbChunkMap)
}

dbSyncer := docsyncer.NewDBSyncer(i, src.URL, src.ReplicaName, toUrl, trans, orphanFilter, qos)
dbSyncer := docsyncer.NewDBSyncer(i, src.URL, src.ReplicaName, toUrl, trans, orphanFilter, qos, fromIsSharding)
dbSyncer.Init()
LOG.Info("document syncer-%d do replication for url=%v", i, src.URL)

Expand Down Expand Up @@ -269,3 +266,11 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
LOG.Info("document syncer sync end")
return nil
}

func (coordinator *ReplicationCoordinator) SourceIsSharding() bool {
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
return coordinator.MongoS != nil
} else {
return len(conf.Options.MongoUrls) > 1
}
}
4 changes: 3 additions & 1 deletion src/mongoshake/collector/coordinator/incr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func (coordinator *ReplicationCoordinator) startOplogReplication(oplogStartPosit

// prepare all syncer. only one syncer while source is ReplicaSet
// otherwise one syncer connects to one shard
for _, src := range coordinator.RealSourceIncrSync {
LOG.Info("start incr replication")
for i, src := range coordinator.RealSourceIncrSync {
LOG.Info("RealSourceIncrSync[%d]: %s", i, src)
syncer := collector.NewOplogSyncer(src.ReplicaName, oplogStartPosition, fullSyncFinishPosition, src.URL,
src.Gids, coordinator.rateController)
// syncerGroup http api registry
Expand Down
8 changes: 5 additions & 3 deletions src/mongoshake/collector/coordinator/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error {

// try to connect CheckpointStorage
checkpointStorageUrl := conf.Options.CheckpointStorageUrl
if conn, err = utils.NewMongoConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true); conn == nil || !conn.IsGood() || err != nil {
if conn, err = utils.NewMongoConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect checkpointStorageUrl[%v] error[%v]. Please add primary node into 'mongo_urls' " +
"if 'context.storage.url' is empty", checkpointStorageUrl, err)
return err
}
conn.Close()

for i, src := range coordinator.MongoD {
if conn, err = utils.NewMongoConn(src.URL, conf.Options.MongoConnectMode, true); conn == nil || !conn.IsGood() || err != nil {
if conn, err = utils.NewMongoConn(src.URL, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect mongo server error. %v, url : %s. See https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-solve-the-oplog-tailer-initialize-failed-no-reachable-servers-error", err, src.URL)
return err
}
Expand Down Expand Up @@ -173,7 +175,7 @@ func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error {
src.ReplicaName = rsName

// look around if there has uniq index
if !hasUniqIndex {
if !hasUniqIndex && conf.Options.IncrSyncShardKey == oplog.ShardAutomatic {
hasUniqIndex = conn.HasUniqueIndex()
}
// doesn't reuse current connection
Expand Down
5 changes: 3 additions & 2 deletions src/mongoshake/collector/docsyncer/doc_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func NewCollectionExecutor(id int, mongoUrl string, ns utils.NS, syncer *DBSynce

func (colExecutor *CollectionExecutor) Start() error {
var err error
if colExecutor.conn, err = utils.NewMongoConn(colExecutor.mongoUrl, utils.VarMongoConnectModePrimary, true); err != nil {
if colExecutor.conn, err = utils.NewMongoConn(colExecutor.mongoUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); err != nil {
return err
}
if conf.Options.FullSyncExecutorMajorityEnable {
Expand Down Expand Up @@ -202,7 +203,7 @@ func (exec *DocExecutor) tryOneByOne(input []interface{}, index int, collectionH
id := oplog.GetKey(docD, "")

// orphan document enable and source is sharding
if conf.Options.FullSyncExecutorFilterOrphanDocument && len(conf.Options.MongoUrls) > 1 {
if conf.Options.FullSyncExecutorFilterOrphanDocument && exec.syncer.orphanFilter != nil {
// judge whether is orphan document, pass if so
if exec.syncer.orphanFilter.Filter(docD, collectionHandler.FullName) {
LOG.Info("orphan document with _id[%v] filter", id)
Expand Down
9 changes: 6 additions & 3 deletions src/mongoshake/collector/docsyncer/doc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func GetAllNamespace(sources []*utils.MongoSource) (map[utils.NS]struct{}, map[s
func GetDbNamespace(url string) ([]utils.NS, map[string][]string, error) {
var err error
var conn *utils.MongoConn
if conn, err = utils.NewMongoConn(url, utils.VarMongoConnectModeSecondaryPreferred, true); conn == nil || err != nil {
if conn, err = utils.NewMongoConn(url, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault); conn == nil || err != nil {
return nil, nil, err
}
defer conn.Close()
Expand Down Expand Up @@ -119,7 +120,8 @@ func NewDocumentSplitter(src string, ns utils.NS) *DocumentSplitter {

// create connection
var err error
ds.conn, err = utils.NewMongoConn(ds.src, conf.Options.MongoConnectMode, true)
ds.conn, err = utils.NewMongoConn(ds.src, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault)
if err != nil {
LOG.Error("splitter[%s] connection mongo[%v] failed[%v]", ds, ds.src, err)
return nil
Expand Down Expand Up @@ -289,7 +291,8 @@ func (reader *DocumentReader) ensureNetwork() (err error) {
reader.conn.Close()
}
// reconnect
if reader.conn, err = utils.NewMongoConn(reader.src, conf.Options.MongoConnectMode, true); reader.conn == nil || err != nil {
if reader.conn, err = utils.NewMongoConn(reader.src, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault); reader.conn == nil || err != nil {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/mongoshake/collector/docsyncer/doc_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func TestGtDbNamespace(t *testing.T) {
fmt.Printf("TestGtDbNamespace case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault)
assert.Equal(t, nil, err, "should be equal")

err = conn.Session.DB("db1").DropDatabase()
Expand Down
31 changes: 18 additions & 13 deletions src/mongoshake/collector/docsyncer/doc_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func StartNamespaceSpecSyncForSharding(csUrl string, toConn *utils.MongoConn,

var fromConn *utils.MongoConn
var err error
if fromConn, err = utils.NewMongoConn(csUrl, utils.VarMongoConnectModePrimary, true); err != nil {
if fromConn, err = utils.NewMongoConn(csUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault); err != nil {
return err
}
defer fromConn.Close()
Expand Down Expand Up @@ -187,7 +188,8 @@ func StartIndexSync(indexMap map[utils.NS][]mgo.Index, toUrl string,

var conn *utils.MongoConn
var err error
if conn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, false); err != nil {
if conn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, false,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernMajority); err != nil {
return err
}
defer conn.Close()
Expand Down Expand Up @@ -254,6 +256,8 @@ type DBSyncer struct {
indexMap map[utils.NS][]mgo.Index
// start time of sync
startTime time.Time
// source is sharding?
FromIsSharding bool

nsTrans *transform.NamespaceTransform
// filter orphan duplicate record
Expand All @@ -277,19 +281,20 @@ func NewDBSyncer(
toMongoUrl string,
nsTrans *transform.NamespaceTransform,
orphanFilter *filter.OrphanFilter,
qos *utils.Qos) *DBSyncer {
qos *utils.Qos,
fromIsSharding bool) *DBSyncer {

syncer := &DBSyncer{
id: id,
FromMongoUrl: fromMongoUrl,
fromReplset: fromReplset,
ToMongoUrl: toMongoUrl,
// indexMap: make(map[utils.NS][]mgo.Index),
nsTrans: nsTrans,
orphanFilter: orphanFilter,
qos: qos,
metricNsMap: make(map[utils.NS]*CollectionMetric),
replMetric: utils.NewMetric(fromReplset, utils.TypeFull, utils.METRIC_TPS),
id: id,
FromMongoUrl: fromMongoUrl,
fromReplset: fromReplset,
ToMongoUrl: toMongoUrl,
nsTrans: nsTrans,
orphanFilter: orphanFilter,
qos: qos,
metricNsMap: make(map[utils.NS]*CollectionMetric),
replMetric: utils.NewMetric(fromReplset, utils.TypeFull, utils.METRIC_TPS),
FromIsSharding: fromIsSharding,
}

return syncer
Expand Down
9 changes: 6 additions & 3 deletions src/mongoshake/collector/docsyncer/doc_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func fetchAllDocument(conn *utils.MongoConn) ([]bson.D, error) {
func TestDbSync(t *testing.T) {
// test doSync

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, false)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, false,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
assert.Equal(t, nil, err, "should be equal")

// init DocExecutor, ignore DBSyncer here
Expand Down Expand Up @@ -364,7 +365,8 @@ func TestStartDropDestCollection(t *testing.T) {
fmt.Printf("TestStartDropDestCollection case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop old db
Expand Down Expand Up @@ -407,7 +409,8 @@ func TestStartDropDestCollection(t *testing.T) {
fmt.Printf("TestStartDropDestCollection case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop old db
Expand Down
Loading