From 2f13aeb5c2ee8ab0ea2c4bd1bd1f7a35381fefeb Mon Sep 17 00:00:00 2001 From: HuangYi Date: Thu, 27 Feb 2025 16:44:06 +0800 Subject: [PATCH 01/10] Problem: version mismatch happen occationally Solution: - make sure don't load iavl version ahead of versiondb --- app/app.go | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/app/app.go b/app/app.go index 04188c7703..4eca50c07f 100644 --- a/app/app.go +++ b/app/app.go @@ -1041,16 +1041,28 @@ func New( } if loadLatest { - if err := app.LoadLatestVersion(); err != nil { - tmos.Exit(err.Error()) + var qmsVersion int64 + if app.qms != nil { + qmsVersion = app.qms.LatestVersion() } - if app.qms != nil { - v1 := app.qms.LatestVersion() - v2 := app.LastBlockHeight() - if v1 > 0 && v1 < v2 { + if qmsVersion == 0 { + if err := app.LoadLatestVersion(); err != nil { + tmos.Exit(err.Error()) + } + } else { + // make sure iavl version is not ahead of versiondb version + app.SetStoreLoader(VersionStoreLoader(qmsVersion)) + + if err := app.LoadLatestVersion(); err != nil { + tmos.Exit(err.Error()) + } + + // still keep the check for safety + iavlVersion := app.LastBlockHeight() + if qmsVersion < iavlVersion { // try to prevent gap being created in versiondb - tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", v1, v2)) + tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", qmsVersion, iavlVersion)) } } @@ -1491,3 +1503,10 @@ func (app *App) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) return app.BaseApp.CheckTx(req) } + +// VersionStoreLoader will be used by default and loads the latest version +func VersionStoreLoader(version int64) baseapp.StoreLoader { + return func(ms storetypes.CommitMultiStore) error { + return ms.LoadVersion(version) + } +} From 2041267b6511913b786f774dc058099cb18e8442 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Thu, 27 Feb 2025 16:47:22 +0800 Subject: [PATCH 02/10] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d4defef05..6204b842b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [#1724](https://github.com/crypto-org-chain/cronos/pull/1724) Include the fix of nonce management in batch tx in ethermint. * [#1748](https://github.com/crypto-org-chain/cronos/pull/1748) Query with GetCFWithTS to compare both timestamp and key to avoid run fixdata multiple times. * (versiondb) [#1751](https://github.com/crypto-org-chain/cronos/pull/1751) Add missing Destroy for read options to properly hold and release options reference. +* [#1759](https://github.com/crypto-org-chain/cronos/pull/1759) Fix version mismatch happen occasionally. ### Improvements From 5eecccbeb9a3c904abb62c409992c5e90db9ef40 Mon Sep 17 00:00:00 2001 From: yihuang Date: Thu, 27 Feb 2025 16:48:49 +0800 Subject: [PATCH 03/10] Update app/app.go Signed-off-by: yihuang --- app/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/app.go b/app/app.go index 4eca50c07f..2c0668c032 100644 --- a/app/app.go +++ b/app/app.go @@ -1504,7 +1504,7 @@ func (app *App) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) return app.BaseApp.CheckTx(req) } -// VersionStoreLoader will be used by default and loads the latest version +// VersionStoreLoader will be used when there's versiondb func VersionStoreLoader(version int64) baseapp.StoreLoader { return func(ms storetypes.CommitMultiStore) error { return ms.LoadVersion(version) From cbb3b58697878a9358f76ebdeeae62334fef9b27 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Thu, 27 Feb 2025 17:17:29 +0800 Subject: [PATCH 04/10] memiavl support write old version idopodently --- memiavl/db.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/memiavl/db.go b/memiavl/db.go index 02566c8a52..f2866540f0 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -560,9 +560,18 @@ func (db *DB) Commit() (int64, error) { if err != nil { return 0, err } - if err := db.wal.Write(entry.index, bz); err != nil { + + lastIndex, err := db.wal.LastIndex() + if err != nil { return 0, err } + if entry.index < lastIndex+1 { + db.logger.Error("commit old version idempotently", "expected", lastIndex+1, "actual", entry.index) + } else { + if err := db.wal.Write(entry.index, bz); err != nil { + return 0, err + } + } } } @@ -591,13 +600,25 @@ func (db *DB) initAsyncCommit() { break } - for _, entry := range entries { + lastIndex, err := db.wal.LastIndex() + if err != nil { + walQuit <- err + return + } + + for i, entry := range entries { bz, err := entry.data.Marshal() if err != nil { walQuit <- err return } - batch.Write(entry.index, bz) + + if entry.index < lastIndex+uint64(i+1) { + db.logger.Error("commit old version idempotently", "expected", lastIndex+uint64(i+1), "actual", entry.index) + continue + } else { + batch.Write(entry.index, bz) + } } if err := db.wal.WriteBatch(&batch); err != nil { From 8b729a82dab3c604b96eb7e240fa236040254018 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 3 Mar 2025 16:25:01 +0800 Subject: [PATCH 05/10] add unit test --- memiavl/db.go | 44 +++++++++++++++++++++----------------- memiavl/db_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 19 deletions(-) diff --git a/memiavl/db.go b/memiavl/db.go index f2866540f0..baffce353d 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -80,6 +80,9 @@ type DB struct { mtx sync.Mutex // worker goroutine IdleTimeout = 5s snapshotWriterPool *pond.WorkerPool + + // reusable write batch + wbatch wal.Batch } type Options struct { @@ -556,21 +559,18 @@ func (db *DB) Commit() (int64, error) { // async wal writing db.walChan <- &entry } else { - bz, err := entry.data.Marshal() + lastIndex, err := db.wal.LastIndex() if err != nil { return 0, err } - lastIndex, err := db.wal.LastIndex() - if err != nil { + db.wbatch.Clear() + if err := writeEntry(&db.wbatch, db.logger, lastIndex, &entry); err != nil { return 0, err } - if entry.index < lastIndex+1 { - db.logger.Error("commit old version idempotently", "expected", lastIndex+1, "actual", entry.index) - } else { - if err := db.wal.Write(entry.index, bz); err != nil { - return 0, err - } + + if err := db.wal.WriteBatch(&db.wbatch); err != nil { + return 0, err } } } @@ -606,19 +606,11 @@ func (db *DB) initAsyncCommit() { return } - for i, entry := range entries { - bz, err := entry.data.Marshal() - if err != nil { + for _, entry := range entries { + if err := writeEntry(&batch, db.logger, lastIndex, entry); err != nil { walQuit <- err return } - - if entry.index < lastIndex+uint64(i+1) { - db.logger.Error("commit old version idempotently", "expected", lastIndex+uint64(i+1), "actual", entry.index) - continue - } else { - batch.Write(entry.index, bz) - } } if err := db.wal.WriteBatch(&batch); err != nil { @@ -1114,3 +1106,17 @@ func channelBatchRecv[T any](ch <-chan *T) []*T { return result } + +func writeEntry(batch *wal.Batch, logger Logger, lastIndex uint64, entry *walEntry) error { + bz, err := entry.data.Marshal() + if err != nil { + return err + } + + if entry.index <= lastIndex { + logger.Error("commit old version idempotently", "lastIndex", lastIndex, "version", entry.index) + } else { + batch.Write(entry.index, bz) + } + return nil +} diff --git a/memiavl/db_test.go b/memiavl/db_test.go index a59f8c3886..c86f3a1aba 100644 --- a/memiavl/db_test.go +++ b/memiavl/db_test.go @@ -3,6 +3,7 @@ package memiavl import ( "encoding/hex" "errors" + fmt "fmt" "os" "path/filepath" "runtime/debug" @@ -497,3 +498,55 @@ func TestRepeatedApplyChangeSet(t *testing.T) { }) require.NoError(t, err) } + +func TestIdempotentWrite(t *testing.T) { + dir := t.TempDir() + db, err := Load(dir, Options{CreateIfMissing: true, InitialStores: []string{"test1", "test2"}}) + require.NoError(t, err) + + // generate some data into db + var changes [][]*NamedChangeSet + for i := 0; i < 10; i++ { + cs := []*NamedChangeSet{ + { + Name: "test1", + Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))}, + }, + { + Name: "test2", + Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))}, + }, + } + changes = append(changes, cs) + } + + for _, cs := range changes { + require.NoError(t, db.ApplyChangeSets(cs)) + _, err := db.Commit() + require.NoError(t, err) + } + + commitInfo := *db.LastCommitInfo() + require.NoError(t, db.Close()) + + // reload db from disk at an intermidiate version + db, err = Load(dir, Options{TargetVersion: 5}) + require.NoError(t, err) + + // replay some random writes to reach same version + for i := 0; i < 5; i++ { + require.NoError(t, db.ApplyChangeSets(changes[i+5])) + _, err := db.Commit() + require.NoError(t, err) + } + + // it should reach same result + require.Equal(t, commitInfo, *db.LastCommitInfo()) + + require.NoError(t, db.Close()) + + // reload db again, it should reach same result + db, err = Load(dir, Options{}) + require.NoError(t, err) + require.Equal(t, commitInfo, *db.LastCommitInfo()) +} From 6eb73a1eb46f488cafc0f173d28b078ec3b720da Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 3 Mar 2025 16:51:50 +0800 Subject: [PATCH 06/10] fix upgrade store loader --- app/app.go | 57 +++++++++++++++++++++---------------------------- app/upgrades.go | 9 ++++++-- 2 files changed, 31 insertions(+), 35 deletions(-) diff --git a/app/app.go b/app/app.go index 2c0668c032..f375a369ea 100644 --- a/app/app.go +++ b/app/app.go @@ -959,9 +959,27 @@ func New( panic(err) } + // wire up the versiondb's `StreamingService` and `MultiStore`. + if cast.ToBool(appOpts.Get("versiondb.enable")) { + var err error + app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys) + if err != nil { + panic(err) + } + } + + var qmsVersion int64 + if app.qms != nil { + qmsVersion = app.qms.LatestVersion() + } + // RegisterUpgradeHandlers is used for registering any on-chain upgrades. // Make sure it's called after `app.mm` and `app.configurator` are set. - app.RegisterUpgradeHandlers(app.appCodec) + storeLoaderOverritten := app.RegisterUpgradeHandlers(app.appCodec, qmsVersion) + if !storeLoaderOverritten { + // Register the default store loader + app.SetStoreLoader(MaxVersionStoreLoader(qmsVersion)) + } // add test gRPC service for testing gRPC queries in isolation // testdata.RegisterQueryServer(app.GRPCQueryRouter(), testdata.QueryImpl{}) @@ -992,15 +1010,6 @@ func New( app.MountMemoryStores(memKeys) app.MountObjectStores(okeys) - // wire up the versiondb's `StreamingService` and `MultiStore`. - if cast.ToBool(appOpts.Get("versiondb.enable")) { - var err error - app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys) - if err != nil { - panic(err) - } - } - // initialize BaseApp app.SetInitChainer(app.InitChainer) app.SetPreBlocker(app.PreBlocker) @@ -1041,24 +1050,13 @@ func New( } if loadLatest { - var qmsVersion int64 - if app.qms != nil { - qmsVersion = app.qms.LatestVersion() + if err := app.LoadLatestVersion(); err != nil { + tmos.Exit(err.Error()) } - if qmsVersion == 0 { - if err := app.LoadLatestVersion(); err != nil { - tmos.Exit(err.Error()) - } - } else { - // make sure iavl version is not ahead of versiondb version - app.SetStoreLoader(VersionStoreLoader(qmsVersion)) - - if err := app.LoadLatestVersion(); err != nil { - tmos.Exit(err.Error()) - } - - // still keep the check for safety + if qmsVersion > 0 { + // it should not happens since we constraint the loaded iavl version to not exceed the versiondb version, + // still keep the check for safety. iavlVersion := app.LastBlockHeight() if qmsVersion < iavlVersion { // try to prevent gap being created in versiondb @@ -1503,10 +1501,3 @@ func (app *App) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) return app.BaseApp.CheckTx(req) } - -// VersionStoreLoader will be used when there's versiondb -func VersionStoreLoader(version int64) baseapp.StoreLoader { - return func(ms storetypes.CommitMultiStore) error { - return ms.LoadVersion(version) - } -} diff --git a/app/upgrades.go b/app/upgrades.go index 61166895ee..532b1b7e71 100644 --- a/app/upgrades.go +++ b/app/upgrades.go @@ -17,7 +17,8 @@ import ( evmtypes "github.com/evmos/ethermint/x/evm/types" ) -func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec) { +// RegisterUpgradeHandlers returns if store loader is overridden +func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec, maxVersion int64) bool { planName := "v1.4" app.UpgradeKeeper.SetUpgradeHandler(planName, func(ctx context.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) { m, err := app.ModuleManager.RunMigrations(ctx, app.configurator, fromVM) @@ -54,14 +55,18 @@ func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec) { } if !app.UpgradeKeeper.IsSkipHeight(upgradeInfo.Height) { if upgradeInfo.Name == planName { - app.SetStoreLoader(upgradetypes.UpgradeStoreLoader(upgradeInfo.Height, &storetypes.StoreUpgrades{ + app.SetStoreLoader(MaxVersionUpgradeStoreLoader(maxVersion, upgradeInfo.Height, &storetypes.StoreUpgrades{ Added: []string{ icahosttypes.StoreKey, }, Deleted: []string{"icaauth"}, })) + + return true } } + + return false } func UpdateExpeditedParams(ctx context.Context, gov govkeeper.Keeper) error { From f1313c36e4c9046d59b3fff4b099b8b8b338f4f3 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 3 Mar 2025 17:22:15 +0800 Subject: [PATCH 07/10] reproduce in integration test --- app/storeloader.go | 37 +++++++++++++++++++++++++++++ integration_tests/shell.nix | 1 + integration_tests/test_versiondb.py | 10 ++++++-- memiavl/db.go | 3 ++- 4 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 app/storeloader.go diff --git a/app/storeloader.go b/app/storeloader.go new file mode 100644 index 0000000000..4bf42cab45 --- /dev/null +++ b/app/storeloader.go @@ -0,0 +1,37 @@ +package app + +import ( + storetypes "cosmossdk.io/store/types" + upgradetypes "cosmossdk.io/x/upgrade/types" + "github.com/cosmos/cosmos-sdk/baseapp" +) + +// MaxVersionStoreLoader will be used when there's versiondb to cap the loaded iavl version +func MaxVersionStoreLoader(version int64) baseapp.StoreLoader { + if version == 0 { + return baseapp.DefaultStoreLoader + } + + return func(ms storetypes.CommitMultiStore) error { + return ms.LoadVersion(version) + } +} + +// MaxVersionUpgradeStoreLoader is used to prepare baseapp with a fixed StoreLoader +func MaxVersionUpgradeStoreLoader(version int64, upgradeHeight int64, storeUpgrades *storetypes.StoreUpgrades) baseapp.StoreLoader { + if version == 0 { + return upgradetypes.UpgradeStoreLoader(upgradeHeight, storeUpgrades) + } + + return func(ms storetypes.CommitMultiStore) error { + if upgradeHeight == ms.LastCommitID().Version+1 { + // Check if the current commit version and upgrade height matches + if len(storeUpgrades.Renamed) > 0 || len(storeUpgrades.Deleted) > 0 || len(storeUpgrades.Added) > 0 { + return ms.LoadLatestVersionAndUpgrade(storeUpgrades) + } + } + + // Otherwise load default store loader + return MaxVersionStoreLoader(version)(ms) + } +} diff --git a/integration_tests/shell.nix b/integration_tests/shell.nix index 0eac66d989..3f028d4299 100644 --- a/integration_tests/shell.nix +++ b/integration_tests/shell.nix @@ -17,5 +17,6 @@ pkgs.mkShell { shellHook = '' mkdir ./coverage export GOCOVERDIR=./coverage + export TMPDIR=/tmp ''; } diff --git a/integration_tests/test_versiondb.py b/integration_tests/test_versiondb.py index eece5eebf7..a6b19c2ab3 100644 --- a/integration_tests/test_versiondb.py +++ b/integration_tests/test_versiondb.py @@ -5,7 +5,7 @@ from pystarport import ports from .network import Cronos -from .utils import ADDRS, send_transaction, wait_for_port +from .utils import ADDRS, send_transaction, w3_wait_for_new_blocks, wait_for_port def test_versiondb_migration(cronos: Cronos): @@ -37,6 +37,9 @@ def test_versiondb_migration(cronos: Cronos): balance1 = w3.eth.get_balance(community) block1 = w3.eth.block_number + # wait for a few blocks + w3_wait_for_new_blocks(w3, 5) + # stop the network first print("stop all nodes") print(cronos.supervisorctl("stop", "all")) @@ -45,7 +48,10 @@ def test_versiondb_migration(cronos: Cronos): changeset_dir = tempfile.mkdtemp(dir=cronos.base_dir) print("dump to:", changeset_dir) - print(cli1.changeset_dump(changeset_dir)) + + # only restore to an intermidiate version to test version mismatch behavior + print(cli1.changeset_dump(changeset_dir, end_version=block1 + 1)) + snapshot_dir = tempfile.mkdtemp(dir=cronos.base_dir) print("verify and save to snapshot:", snapshot_dir) _, commit_info = cli0.changeset_verify(changeset_dir, save_snapshot=snapshot_dir) diff --git a/memiavl/db.go b/memiavl/db.go index baffce353d..0fc14458c0 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -762,7 +762,8 @@ func (db *DB) rewriteSnapshotBackground() error { cloned.logger.Info("start rewriting snapshot", "version", cloned.Version()) if err := cloned.RewriteSnapshotWithContext(ctx); err != nil { - ch <- snapshotResult{err: err} + // write error log but don't stop the client, it could happen when load an old version. + cloned.logger.Error("failed to rewrite snapshot", "err", err) return } cloned.logger.Info("finished rewriting snapshot", "version", cloned.Version()) From 693d382d9919f3c2e05f9aa8aed94a8605f23002 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 3 Mar 2025 17:33:02 +0800 Subject: [PATCH 08/10] fix memiavl --- memiavl/db.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/memiavl/db.go b/memiavl/db.go index 0fc14458c0..aef5025ca6 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -443,8 +443,13 @@ func (db *DB) checkBackgroundSnapshotRewrite() error { db.snapshotRewriteCancel = nil if result.mtree == nil { - // background snapshot rewrite failed - return fmt.Errorf("background snapshot rewriting failed: %w", result.err) + if result.err != nil { + // background snapshot rewrite failed + return fmt.Errorf("background snapshot rewriting failed: %w", result.err) + } + + // background snapshot rewrite don't success, but no error to propogate, ignore it. + return nil } // wait for potential pending wal writings to finish, to make sure we catch up to latest state. From 05dc62c36ad42ef75c400bd7f0b64fbf9d916c45 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 4 Mar 2025 08:26:49 +0800 Subject: [PATCH 09/10] better test --- memiavl/db_test.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/memiavl/db_test.go b/memiavl/db_test.go index c86f3a1aba..a2a3070771 100644 --- a/memiavl/db_test.go +++ b/memiavl/db_test.go @@ -500,8 +500,26 @@ func TestRepeatedApplyChangeSet(t *testing.T) { } func TestIdempotentWrite(t *testing.T) { + for _, asyncCommit := range []bool{false, true} { + t.Run(fmt.Sprintf("asyncCommit=%v", asyncCommit), func(t *testing.T) { + testIdempotentWrite(t, asyncCommit) + }) + } +} + +func testIdempotentWrite(t *testing.T, asyncCommit bool) { dir := t.TempDir() - db, err := Load(dir, Options{CreateIfMissing: true, InitialStores: []string{"test1", "test2"}}) + + asyncCommitBuffer := -1 + if asyncCommit { + asyncCommitBuffer = 10 + } + + db, err := Load(dir, Options{ + CreateIfMissing: true, + InitialStores: []string{"test1", "test2"}, + AsyncCommitBuffer: asyncCommitBuffer, + }) require.NoError(t, err) // generate some data into db From 008b155cfcde1942fe01e7d9ced4409b7b947a62 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 4 Mar 2025 09:34:17 +0800 Subject: [PATCH 10/10] fix lint --- memiavl/db.go | 2 +- memiavl/db_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/memiavl/db.go b/memiavl/db.go index aef5025ca6..8ef6e7cc46 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -448,7 +448,7 @@ func (db *DB) checkBackgroundSnapshotRewrite() error { return fmt.Errorf("background snapshot rewriting failed: %w", result.err) } - // background snapshot rewrite don't success, but no error to propogate, ignore it. + // background snapshot rewrite don't success, but no error to propagate, ignore it. return nil } diff --git a/memiavl/db_test.go b/memiavl/db_test.go index a2a3070771..ea7a39a86f 100644 --- a/memiavl/db_test.go +++ b/memiavl/db_test.go @@ -547,7 +547,7 @@ func testIdempotentWrite(t *testing.T, asyncCommit bool) { commitInfo := *db.LastCommitInfo() require.NoError(t, db.Close()) - // reload db from disk at an intermidiate version + // reload db from disk at an intermediate version db, err = Load(dir, Options{TargetVersion: 5}) require.NoError(t, err)