Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a new dedicated workqueue only for async write commit, and block clients while committing changes #741

Open
wants to merge 1 commit into
base: async_flash
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config.cpp
Original file line number Diff line number Diff line change
@@ -361,7 +361,7 @@ bool initializeStorageProvider(const char **err)
// Create The Storage Factory (if necessary)
serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)");
adjustOpenFilesLimit();
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue);
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue, &g_pserver->asyncwriteworkqueue);
#else
serverLog(LL_WARNING, "To use the flash storage provider please compile KeyDB with ENABLE_FLASH=yes");
serverLog(LL_WARNING, "Exiting due to the use of an unsupported storage provider");
17 changes: 17 additions & 0 deletions src/db.cpp
Original file line number Diff line number Diff line change
@@ -415,6 +415,13 @@ void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, in
}
incrRefCount(val);
if (signal) signalModifiedKey(c,db,key);

if(g_pserver->m_pstorageFactory != nullptr) {
if (!(c->flags & CLIENT_BLOCKED)) {
blockClient(c, BLOCKED_STORAGE);
}
serverTL->setclientsCommit.insert(c);
}
}

/* Common case for genericSetKey() where the TTL is not retained. */
@@ -3091,9 +3098,18 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback);
if (tok != nullptr)
{
for (client *c : serverTL->setclientsCommit)
{
/* Remove from the list of pending writes if needed. */
if (c->flags & CLIENT_PENDING_WRITE) {
c->flags &= ~CLIENT_PENDING_WRITE;
}
}
tok->setc = std::move(serverTL->setclientsCommit);
tok->db = this;
tok->type = StorageToken::TokenType::BatchWrite;
}
serverTL->setclientsCommit.clear();
}
}

@@ -3416,6 +3432,7 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)
blockClient(c, BLOCKED_STORAGE);
}
tok->setc = std::move(setcBlocked);
tok->type = StorageToken::TokenType::SingleRead;
tok->db = this;
}
return;
3 changes: 3 additions & 0 deletions src/server.cpp
Original file line number Diff line number Diff line change
@@ -4160,6 +4160,9 @@ void InitServerLast() {

g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10);

//Process one write/commit at a time to ensure consistency
g_pserver->asyncwriteworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(1);

// Allocate the repl backlog

}
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
@@ -2218,6 +2218,7 @@ struct redisServerThreadVars {
int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */
std::unordered_set<client*> setclientsProcess;
std::unordered_set<client*> setclientsPrefetch;
std::unordered_set<client*> setclientsCommit;
std::unordered_set<StorageToken*> setStorageTokensProcess;
dictAsyncRehashCtl *rehashCtl = nullptr;

@@ -2705,6 +2706,7 @@ struct redisServer {
uint64_t mvcc_tstamp;

AsyncWorkQueue *asyncworkqueue;
AsyncWorkQueue *asyncwriteworkqueue;

/* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */
2 changes: 1 addition & 1 deletion src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
@@ -266,7 +266,7 @@ StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el
tok->tspdb = m_spdb;
m_spbatch = nullptr;
m_lock.unlock();
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{
(*m_pfactory->m_wworkqueue)->AddWorkFunction([this, el,callback,tok]{
tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch());
aePostFunction(el,callback,tok);
});
3 changes: 2 additions & 1 deletion src/storage/rocksdbfactor_internal.h
Original file line number Diff line number Diff line change
@@ -12,8 +12,9 @@ class RocksDBStorageFactory : public IStorageFactory

public:
AsyncWorkQueue **m_wqueue;
AsyncWorkQueue **m_wworkqueue;

RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);
RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue);
~RocksDBStorageFactory();

virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
9 changes: 4 additions & 5 deletions src/storage/rocksdbfactory.cpp
Original file line number Diff line number Diff line change
@@ -35,9 +35,8 @@ rocksdb::Options DefaultRocksDBOptions() {
return options;
}

IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
{
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue);
IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue)
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue, wworkqueue);
}

rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
@@ -52,8 +51,8 @@ rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
return options;
}

RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
: m_path(dbfile), m_wqueue(wqueue)
RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue)
: m_path(dbfile), m_wqueue(wqueue), m_wworkqueue(wworkqueue)
{
dbnum++; // create an extra db for metadata
// Get the count of column families in the actual database
2 changes: 1 addition & 1 deletion src/storage/rocksdbfactory.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once

class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);
class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue);