From bd2455a8bbd83c111733d71ce4ea4fc1f195eb5c Mon Sep 17 00:00:00 2001 From: Qi Zhou Date: Sun, 15 May 2022 19:09:23 -0700 Subject: [PATCH] add sharded storage (sstorage) cmd (#83) * add cmd/storage * add cmd/storage read sub-cmd * add cmd/storage write sub-cmd * add data_shard.go * add shard_read/write for cmd/storage * move sstorage lib files to /sstorage Co-authored-by: Qi Zhou --- cmd/sstorage/main.go | 242 +++++++++++++++++++++++++++++++++++ sstorage/data_file.go | 257 ++++++++++++++++++++++++++++++++++++++ sstorage/data_shard.go | 144 +++++++++++++++++++++ sstorage/shard_config.go | 113 +++++++++++++++++ sstorage/shard_manager.go | 79 ++++++++++++ 5 files changed, 835 insertions(+) create mode 100644 cmd/sstorage/main.go create mode 100644 sstorage/data_file.go create mode 100644 sstorage/data_shard.go create mode 100644 sstorage/shard_config.go create mode 100644 sstorage/shard_manager.go diff --git a/cmd/sstorage/main.go b/cmd/sstorage/main.go new file mode 100644 index 000000000000..6f16ac6f27e9 --- /dev/null +++ b/cmd/sstorage/main.go @@ -0,0 +1,242 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/sstorage" + "github.com/mattn/go-colorable" + "github.com/mattn/go-isatty" + "github.com/spf13/cobra" +) + +var ( + chunkIdxLen *uint64 + filenames *[]string + + verbosity *int + + chunkIdx *uint64 + readLen *uint64 + readMasked *bool + + shardIdx *uint64 + kvSize *uint64 + kvEntries *uint64 + kvIdx *uint64 +) + +var CreateCmd = &cobra.Command{ + Use: "create", + Short: "Create a data file", + Run: runCreate, +} + +var ChunkReadCmd = &cobra.Command{ + Use: "chunk_read", + Short: "Read a chunk from a data file", + Run: runChunkRead, +} + +var ChunkWriteCmd = &cobra.Command{ + Use: "chunk_write", + Short: "Write a chunk from a data file", + Run: runChunkWrite, +} + +var ShardReadCmd = &cobra.Command{ + Use: "shard_read", + Short: "Read a KV from a data shard", + Run: runShardRead, +} + +var ShardWriteCmd = &cobra.Command{ + Use: "shard_write", + Short: "Write a value to a data shard", + Run: runShardWrite, +} + +func init() { + chunkIdxLen = CreateCmd.Flags().Uint64("len", 0, "Chunk idx len to create") + + filenames = rootCmd.PersistentFlags().StringArray("filename", []string{}, "Data filename") + verbosity = rootCmd.PersistentFlags().Int("verbosity", 3, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail") + chunkIdx = rootCmd.PersistentFlags().Uint64("chunk_idx", 0, "Chunk idx to start/read/write") + + shardIdx = rootCmd.PersistentFlags().Uint64("shard_idx", 0, "Shard idx to read/write") + kvSize = rootCmd.PersistentFlags().Uint64("kv_size", 4096, "Shard KV size to read/write") + kvIdx = rootCmd.PersistentFlags().Uint64("kv_idx", 0, "Shard KV index to read/write") + kvEntries = rootCmd.PersistentFlags().Uint64("kv_entries", 0, "Number of KV entries in the shard") + + readMasked = rootCmd.PersistentFlags().Bool("masked", false, "Read masked or not") + readLen = rootCmd.PersistentFlags().Uint64("readlen", 0, "Bytes to read (only for unmasked read)") +} + +func setupLogger() { + glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) + glogger.Verbosity(log.Lvl(*verbosity)) + log.Root().SetHandler(glogger) + + // setup logger + var ostream log.Handler + output := io.Writer(os.Stderr) + + usecolor := (isatty.IsTerminal(os.Stderr.Fd()) || isatty.IsCygwinTerminal(os.Stderr.Fd())) && os.Getenv("TERM") != "dumb" + if usecolor { + output = colorable.NewColorableStderr() + } + ostream = log.StreamHandler(output, log.TerminalFormat(usecolor)) + + glogger.SetHandler(ostream) +} + +func runCreate(cmd *cobra.Command, args []string) { + setupLogger() + + if len(*filenames) != 1 { + log.Crit("must provide single filename") + } + + _, err := sstorage.Create((*filenames)[0], *chunkIdx, *chunkIdxLen, sstorage.MASK_KECCAK_256) + if err != nil { + log.Crit("create failed", "error", err) + } +} + +func runChunkRead(cmd *cobra.Command, args []string) { + setupLogger() + + if len(*filenames) != 1 { + log.Crit("must provide a filename") + } + + var err error + var df *sstorage.DataFile + df, err = sstorage.OpenDataFile((*filenames)[0]) + if err != nil { + log.Crit("open failed", "error", err) + } + + var b []byte + if *readMasked { + b, err = df.ReadMasked(*chunkIdx) + } else { + b, err = df.ReadUnmasked(*chunkIdx, int(*readLen)) + } + if err != nil { + log.Crit("open failed", "error", err) + } + os.Stdout.Write(b) +} + +func readInputBytes() []byte { + in := bufio.NewReader(os.Stdin) + b := make([]byte, 0) + for { + c, err := in.ReadByte() + if err == io.EOF { + break + } + b = append(b, c) + } + return b +} + +func runChunkWrite(cmd *cobra.Command, args []string) { + setupLogger() + + if len(*filenames) != 1 { + log.Crit("must provide a filename") + } + + var err error + var df *sstorage.DataFile + df, err = sstorage.OpenDataFile((*filenames)[0]) + if err != nil { + log.Crit("open failed", "error", err) + } + + err = df.WriteUnmasked(*chunkIdx, readInputBytes()) + if err != nil { + log.Crit("write failed", "error", err) + } +} + +func runShardRead(cmd *cobra.Command, args []string) { + setupLogger() + + ds := sstorage.NewDataShard(*shardIdx, *kvSize, *kvEntries) + for _, filename := range *filenames { + var err error + var df *sstorage.DataFile + df, err = sstorage.OpenDataFile(filename) + if err != nil { + log.Crit("open failed", "error", err) + } + ds.AddDataFile(df) + } + + if !ds.IsComplete() { + log.Warn("shard is not completed") + } + + var b []byte + var err error + if *readMasked { + b, err = ds.ReadMasked(*kvIdx) + } else { + b, err = ds.ReadUnmasked(*kvIdx, int(*readLen)) + } + if err != nil { + log.Crit("read failed", "error", err) + } + os.Stdout.Write(b) +} + +func runShardWrite(cmd *cobra.Command, args []string) { + setupLogger() + + ds := sstorage.NewDataShard(*shardIdx, *kvSize, *kvEntries) + for _, filename := range *filenames { + var err error + var df *sstorage.DataFile + df, err = sstorage.OpenDataFile(filename) + if err != nil { + log.Crit("open failed", "error", err) + } + ds.AddDataFile(df) + } + + if !ds.IsComplete() { + log.Warn("shard is not completed") + } + + err := ds.WriteUnmasked(*kvIdx, readInputBytes()) + if err != nil { + log.Crit("write failed", "error", err) + } +} + +// rootCmd represents the base command when called without any subcommands +var rootCmd = &cobra.Command{ + Use: "sstorage", + Short: "Sharded storage tools", +} + +func init() { + rootCmd.AddCommand(CreateCmd) + rootCmd.AddCommand(ChunkReadCmd) + rootCmd.AddCommand(ChunkWriteCmd) + rootCmd.AddCommand(ShardReadCmd) + rootCmd.AddCommand(ShardWriteCmd) +} + +func main() { + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/sstorage/data_file.go b/sstorage/data_file.go new file mode 100644 index 000000000000..6fb051fffeb7 --- /dev/null +++ b/sstorage/data_file.go @@ -0,0 +1,257 @@ +package sstorage + +import ( + "bytes" + "encoding/binary" + "fmt" + "os" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" +) + +const ( + NO_MASK = iota + MASK_KECCAK_256 = NO_MASK + 1 + MASK_END = MASK_KECCAK_256 + // TODO: randomx + + // keccak256(b'Web3Q Large Storage')[0:8] + MAGIC = uint64(0xcf20bd770c22b2e1) + VERSION = uint64(1) + + CHUNK_SIZE = uint64(4096) +) + +type DataFile struct { + file *os.File + chunkIdxStart uint64 + chunkIdxLen uint64 + maskType uint64 +} + +type DataFileHeader struct { + magic uint64 + version uint64 + chunkIdxStart uint64 + chunkIdxLen uint64 + maskType uint64 + status uint64 +} + +func getMaskData(chunkIdx uint64, maskType uint64) []byte { + if maskType > MASK_END { + panic("unsupported mask type") + } + + if maskType == NO_MASK { + return bytes.Repeat([]byte{0}, int(CHUNK_SIZE)) + } + + seed := make([]byte, 16) + binary.BigEndian.PutUint64(seed, MAGIC) + binary.BigEndian.PutUint64(seed[8:], chunkIdx) + bs := crypto.Keccak256(seed) + return bytes.Repeat(bs, int(CHUNK_SIZE)/len(bs)) +} + +// Mask the data in place +func MaskDataInPlace(maskData []byte, userData []byte) []byte { + if len(userData) > len(maskData) { + panic("user data can not be larger than mask data") + } + for i := 0; i < len(userData); i++ { + maskData[i] = maskData[i] ^ userData[i] + } + return maskData +} + +// Unmask the data in place +func UnmaskDataInPlace(userData []byte, maskData []byte) []byte { + if len(userData) > len(maskData) { + panic("user data can not be larger than mask data") + } + for i := 0; i < len(userData); i++ { + userData[i] = maskData[i] ^ userData[i] + } + return userData +} + +func Create(filename string, chunkIdxStart uint64, chunkIdxLen uint64, maskType uint64) (*DataFile, error) { + log.Info("Creating file", "filename", filename) + file, err := os.Create(filename) + if err != nil { + return nil, err + } + for i := uint64(0); i < chunkIdxLen; i++ { + chunkIdx := chunkIdxStart + i + _, err := file.WriteAt(getMaskData(chunkIdx, maskType), int64((chunkIdx+1)*CHUNK_SIZE)) + if err != nil { + return nil, err + } + } + dataFile := &DataFile{ + file: file, + chunkIdxStart: chunkIdxStart, + chunkIdxLen: chunkIdxLen, + maskType: maskType, + } + dataFile.writeHeader() + return dataFile, nil +} + +func OpenDataFile(filename string) (*DataFile, error) { + file, err := os.OpenFile(filename, os.O_RDWR, 0755) + if err != nil { + return nil, err + } + dataFile := &DataFile{ + file: file, + } + return dataFile, dataFile.readHeader() +} + +func (df *DataFile) Contains(chunkIdx uint64) bool { + return chunkIdx >= df.chunkIdxStart && chunkIdx < df.ChunkIdxEnd() +} + +func (df *DataFile) ChunkIdxEnd() uint64 { + return df.chunkIdxStart + df.chunkIdxLen +} + +// Reads the raw data without unmasking +func (df *DataFile) ReadMasked(chunkIdx uint64) ([]byte, error) { + if !df.Contains(chunkIdx) { + return nil, fmt.Errorf("chunk not found") + } + md := make([]byte, CHUNK_SIZE) + n, err := df.file.ReadAt(md, int64(chunkIdx+1)*int64(CHUNK_SIZE)) + if err != nil { + return nil, err + } + if n != int(CHUNK_SIZE) { + return nil, fmt.Errorf("not full read") + } + return md, nil +} + +func (df *DataFile) ReadUnmasked(chunkIdx uint64, len int) ([]byte, error) { + if !df.Contains(chunkIdx) { + return nil, fmt.Errorf("chunk not found") + } + ud := make([]byte, len) + n, err := df.file.ReadAt(ud, int64(chunkIdx+1)*int64(CHUNK_SIZE)) + if err != nil { + return nil, err + } + if n != len { + return nil, fmt.Errorf("not full read") + } + return UnmaskDataInPlace(ud, getMaskData(chunkIdx, df.maskType)), nil +} + +func (df *DataFile) WriteUnmasked(chunkIdx uint64, b []byte) error { + if !df.Contains(chunkIdx) { + return fmt.Errorf("chunk not found") + } + + if len(b) > int(CHUNK_SIZE) { + return fmt.Errorf("write data too large") + } + + md := MaskDataInPlace(getMaskData(chunkIdx, df.maskType), b) + _, err := df.file.WriteAt(md, int64(chunkIdx+1)*int64(CHUNK_SIZE)) + return err +} + +func (df *DataFile) writeHeader() error { + header := DataFileHeader{ + magic: MAGIC, + version: VERSION, + chunkIdxStart: df.chunkIdxStart, + chunkIdxLen: df.chunkIdxLen, + maskType: df.maskType, + status: 0, + } + + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.BigEndian, header.magic); err != nil { + return err + } + if err := binary.Write(buf, binary.BigEndian, header.version); err != nil { + return err + } + if err := binary.Write(buf, binary.BigEndian, header.chunkIdxStart); err != nil { + return err + } + if err := binary.Write(buf, binary.BigEndian, header.chunkIdxLen); err != nil { + return err + } + if err := binary.Write(buf, binary.BigEndian, header.maskType); err != nil { + return err + } + if err := binary.Write(buf, binary.BigEndian, header.status); err != nil { + return err + } + if _, err := df.file.WriteAt(buf.Bytes(), 0); err != nil { + return err + } + return nil +} + +func (df *DataFile) readHeader() error { + header := DataFileHeader{ + magic: MAGIC, + version: VERSION, + chunkIdxStart: df.chunkIdxStart, + chunkIdxLen: df.chunkIdxLen, + maskType: df.maskType, + status: 0, + } + + b := make([]byte, CHUNK_SIZE) + n, err := df.file.ReadAt(b, 0) + if err != nil { + return err + } + if n != int(CHUNK_SIZE) { + return fmt.Errorf("not full header read") + } + + buf := bytes.NewBuffer(b) + if err := binary.Read(buf, binary.BigEndian, &header.magic); err != nil { + return err + } + if err := binary.Read(buf, binary.BigEndian, &header.version); err != nil { + return err + } + if err := binary.Read(buf, binary.BigEndian, &header.chunkIdxStart); err != nil { + return err + } + if err := binary.Read(buf, binary.BigEndian, &header.chunkIdxLen); err != nil { + return err + } + if err := binary.Read(buf, binary.BigEndian, &header.maskType); err != nil { + return err + } + if err := binary.Read(buf, binary.BigEndian, &header.status); err != nil { + return err + } + + // Sanity check + if header.magic != MAGIC { + return fmt.Errorf("magic error") + } + if header.version > VERSION { + return fmt.Errorf("unsupported version") + } + if header.maskType > MASK_END { + return fmt.Errorf("unknown mask type") + } + + df.chunkIdxStart = header.chunkIdxStart + df.chunkIdxLen = header.chunkIdxLen + df.maskType = header.maskType + + return nil +} diff --git a/sstorage/data_shard.go b/sstorage/data_shard.go new file mode 100644 index 000000000000..c44df40da12e --- /dev/null +++ b/sstorage/data_shard.go @@ -0,0 +1,144 @@ +package sstorage + +import "fmt" + +type DataShard struct { + shardIdx uint64 + kvSize uint64 + chunksPerKv uint64 + kvEntries uint64 + dataFiles []*DataFile +} + +func NewDataShard(shardIdx uint64, kvSize uint64, kvEntries uint64) *DataShard { + if kvSize%CHUNK_SIZE != 0 { + panic("kvSize must be CHUNK_SIZE at the moment") + } + + return &DataShard{shardIdx: shardIdx, kvSize: kvSize, chunksPerKv: kvSize / CHUNK_SIZE, kvEntries: kvEntries} +} + +func (ds *DataShard) AddDataFile(df *DataFile) { + // TODO: May check if not overlapped? + ds.dataFiles = append(ds.dataFiles, df) +} + +// Returns whether the shard has all data files to cover all entries +func (ds *DataShard) IsComplete() bool { + chunkIdx := ds.ChunkIdx() + chunkIdxEnd := (ds.shardIdx + 1) * ds.chunksPerKv * ds.kvEntries + for chunkIdx < chunkIdxEnd { + found := false + for _, df := range ds.dataFiles { + if df.Contains(chunkIdx) { + chunkIdx = df.ChunkIdxEnd() + found = true + } + } + if !found { + return false + } + } + return true +} + +func (ds *DataShard) Contains(kvIdx uint64) bool { + return kvIdx >= ds.shardIdx*ds.kvEntries && kvIdx < (ds.shardIdx+1)*ds.kvEntries +} + +func (ds *DataShard) ChunkIdx() uint64 { + return ds.shardIdx * ds.chunksPerKv * ds.kvEntries +} + +func (ds *DataShard) ReadMasked(kvIdx uint64) ([]byte, error) { + if !ds.Contains(kvIdx) { + return nil, fmt.Errorf("kv not found") + } + var data []byte + for i := uint64(0); i < ds.chunksPerKv; i++ { + chunkIdx := ds.ChunkIdx() + kvIdx*ds.chunksPerKv + i + cdata, err := ds.ReadChunkMasked(chunkIdx) + if err != nil { + return nil, err + } + data = append(data, cdata...) + } + return data, nil +} + +func (ds *DataShard) ReadUnmasked(kvIdx uint64, readLen int) ([]byte, error) { + if !ds.Contains(kvIdx) { + return nil, fmt.Errorf("kv not found") + } + var data []byte + for i := uint64(0); i < ds.chunksPerKv; i++ { + if readLen == 0 { + break + } + + chunkReadLen := readLen + if chunkReadLen > int(CHUNK_SIZE) { + chunkReadLen = int(CHUNK_SIZE) + } + readLen = readLen - chunkReadLen + + chunkIdx := ds.ChunkIdx() + kvIdx*ds.chunksPerKv + i + cdata, err := ds.ReadChunkUnmasked(chunkIdx, chunkReadLen) + if err != nil { + return nil, err + } + data = append(data, cdata...) + } + return data, nil +} + +func (ds *DataShard) WriteUnmasked(kvIdx uint64, b []byte) error { + if !ds.Contains(kvIdx) { + return fmt.Errorf("kv not found") + } + + for i := uint64(0); i < ds.chunksPerKv; i++ { + off := int(i * CHUNK_SIZE) + if off >= len(b) { + break + } + writeLen := len(b) - off + if writeLen > int(CHUNK_SIZE) { + writeLen = int(CHUNK_SIZE) + } + + chunkIdx := ds.ChunkIdx() + kvIdx*ds.chunksPerKv + i + err := ds.WriteChunkUnmasked(chunkIdx, b[off:off+writeLen]) + if err != nil { + return nil + } + } + return nil +} + +func (ds *DataShard) ReadChunkMasked(chunkIdx uint64) ([]byte, error) { + for _, df := range ds.dataFiles { + if df.Contains(chunkIdx) { + return df.ReadMasked(chunkIdx) + } + } + return nil, fmt.Errorf("chunk not found: the shard is not completed?") +} + +func (ds *DataShard) ReadChunkUnmasked(chunkIdx uint64, readLen int) ([]byte, error) { + for _, df := range ds.dataFiles { + if df.Contains(chunkIdx) { + return df.ReadUnmasked(chunkIdx, readLen) + } + } + return nil, fmt.Errorf("chunk not found: the shard is not completed?") +} + +func (ds *DataShard) WriteChunkUnmasked(chunkIdx uint64, b []byte) error { + for _, df := range ds.dataFiles { + if df.Contains(chunkIdx) { + return df.WriteUnmasked(chunkIdx, b) + } + } + return fmt.Errorf("chunk not found: the shard is not completed?") +} diff --git a/sstorage/shard_config.go b/sstorage/shard_config.go new file mode 100644 index 000000000000..2c31869a5580 --- /dev/null +++ b/sstorage/shard_config.go @@ -0,0 +1,113 @@ +package sstorage + +import ( + "fmt" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/common" +) + +// TODO: move to config? +var ContractToShardManager = make(map[common.Address]*ShardManager) + +type ShardInfo struct { + Contract common.Address + KVSize uint64 + KVEntries uint64 +} + +// TODO: move to chain specific config? +var ShardInfos = []*ShardInfo{ + {common.HexToAddress("0x1234"), 4 * 1024, 32 * 1024 * 1024}, +} + +func InitializeConfig() { + for _, sinfo := range ShardInfos { + ContractToShardManager[sinfo.Contract] = NewShardManager(sinfo.Contract, sinfo.KVSize, sinfo.KVEntries) + } +} + +func findShardManaager(kvSize uint64) *ShardManager { + for _, v := range ContractToShardManager { + if v.kvSize == kvSize { + return v + } + } + return nil +} + +func parseKvSize(s string) (uint64, error) { + if s[len(s)] == 'k' || s[len(s)] == 'K' { + if v, err := strconv.Atoi(s[0 : len(s)-1]); err != nil { + return 0, err + } else { + return uint64(v) * 1024, nil + } + } else { + if v, err := strconv.Atoi(s); err != nil { + return 0, err + } else { + return uint64(v), nil + } + } +} + +func AddDataShardFromConfig(cfg string) error { + // Format is kvSize,shardIdx + ss := strings.Split(cfg, ",") + if len(ss) != 2 || len(ss[0]) == 0 || len(ss[1]) == 0 { + return fmt.Errorf("incorrect data shard cfg") + } + + kvSize, err := parseKvSize(ss[0]) + if err != nil { + return err + } + var shardIdx uint64 + + sm := findShardManaager(kvSize) + if sm == nil { + return fmt.Errorf("shard with kv size %d not found", kvSize) + } + + if v, err := strconv.Atoi(ss[1]); err != nil { + return err + } else { + shardIdx = uint64(v) + } + return sm.AddDataShard(shardIdx) +} + +func AddDataFileFromConfig(cfg string) error { + // Format is kvSize,dataFile + ss := strings.Split(cfg, ",") + if len(ss) != 2 || len(ss[0]) == 0 || len(ss[1]) == 0 { + return fmt.Errorf("incorrect data shard cfg") + } + + kvSize, err := parseKvSize(ss[0]) + if err != nil { + return err + } + + sm := findShardManaager(kvSize) + if sm == nil { + return fmt.Errorf("shard with kv size %d not found", kvSize) + } + + df, err := OpenDataFile(ss[1]) + if err != nil { + return err + } + return sm.AddDataFile(df) +} + +func IsComplete() error { + for _, sm := range ContractToShardManager { + if err := sm.IsComplete(); err != nil { + return err + } + } + return nil +} diff --git a/sstorage/shard_manager.go b/sstorage/shard_manager.go new file mode 100644 index 000000000000..a2b8190af61b --- /dev/null +++ b/sstorage/shard_manager.go @@ -0,0 +1,79 @@ +package sstorage + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" +) + +type ShardManager struct { + shardMap map[uint64]*DataShard + contractAddress common.Address + kvSize uint64 + chunksPerKv uint64 + kvEntries uint64 +} + +func NewShardManager(contractAddress common.Address, kvSize uint64, kvEntries uint64) *ShardManager { + return &ShardManager{ + shardMap: make(map[uint64]*DataShard), + contractAddress: contractAddress, + kvSize: kvSize, + chunksPerKv: kvSize / CHUNK_SIZE, + kvEntries: kvEntries, + } +} + +func (sm *ShardManager) MaxKvSize() uint64 { + return sm.kvSize +} + +func (sm *ShardManager) AddDataShard(shardIdx uint64) error { + if _, ok := sm.shardMap[shardIdx]; !ok { + ds := NewDataShard(shardIdx, sm.kvSize, sm.kvEntries) + sm.shardMap[shardIdx] = ds + return nil + } else { + return fmt.Errorf("data shard already exists") + } +} + +func (sm *ShardManager) AddDataFile(df *DataFile) error { + shardIdx := df.chunkIdxStart / sm.chunksPerKv / sm.kvEntries + var ds *DataShard + var ok bool + if ds, ok = sm.shardMap[shardIdx]; !ok { + return fmt.Errorf("data shard not found") + } + + ds.AddDataFile(df) + return nil +} + +func (sm *ShardManager) TryWrite(kvIdx uint64, b []byte) (bool, error) { + shardIdx := kvIdx / sm.kvEntries + if ds, ok := sm.shardMap[shardIdx]; ok { + return true, ds.WriteUnmasked(kvIdx, b) + } else { + return false, nil + } +} + +func (sm *ShardManager) TryRead(kvIdx uint64, readLen int) ([]byte, bool, error) { + shardIdx := kvIdx / sm.kvEntries + if ds, ok := sm.shardMap[shardIdx]; ok { + b, err := ds.ReadUnmasked(kvIdx, readLen) + return b, true, err + } else { + return nil, false, nil + } +} + +func (sm *ShardManager) IsComplete() error { + for _, ds := range sm.shardMap { + if !ds.IsComplete() { + return fmt.Errorf("shard %d is not complete", ds.shardIdx) + } + } + return nil +}