Skip to content

Commit

Permalink
setup redis store
Browse files Browse the repository at this point in the history
  • Loading branch information
mouseless0x committed Feb 27, 2025
1 parent 108bc82 commit 1052182
Showing 1 changed file with 146 additions and 117 deletions.
263 changes: 146 additions & 117 deletions src/store/createRedisOutstandingStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,6 @@ import { HexData32, UserOperation } from "@alto/types"
import { OutstandingStore } from "."
import { Redis } from "ioredis"

// Structure
// - zset priority queue to hold outstanding userOps (scored by gasPrice and stores only sender:nonceKeyKey index)
// - redis list to store pending userOps by sender

// Used to keep track of ops ready for bundling (sorted by gasPrice, and includes lowest nonce for every sender:nonceKeyKey pair)
const prioriyQueueKey = (chainId: number) =>
`${chainId}:outstanding:priority-queue`

// Used to keep track of pending ops by sender:nonceKeyKey pair
const senderNonceKeyIndex = (userOp: UserOperation, chainId: number) => {
const sender = userOp.sender
const [nonceKey] = getNonceKeyAndSequence(userOp.nonce)
const fingerPrint = `${sender}-${nonceKey}`
return `${chainId}:outstanding:pending-ops:${fingerPrint}`
}

// Used to keep track of active sender:nonceKey pair slots (for easier cleanup + finding how many ops are pending)
const senderNonceKeySlotsKey = (chainId: number) =>
`${chainId}:outstanding:slots`

const serializeUserOpInfo = (userOpInfo: UserOpInfo): string => {
return JSON.stringify(userOpInfo, (_, value) =>
typeof value === "bigint" ? value.toString() : value
Expand All @@ -36,103 +16,159 @@ const deserializeUserOpInfo = (data: string): UserOpInfo => {
return userOpInfoSchema.parse(parsed)
}

const createRedisKeys = (chainId: number) => {
return {
// Used to keep track of ops ready for bundling
// - Sorted by gasPrice
// - Only includes lowest nonce for every (sender, nonceKeyKey) pair
// - Stores (sender, nonceKey) redis key
readyOpsQueue: () => {
return `${chainId}:outstanding:pending-queue`
},

// Used to keep track of pending ops by (sender, nonceKey) pair
// - Each key stores a list of pending userOpInfo for that (sender, nonceKey) pair
// - Sorted by nonceSeq
pendingOps: (userOp: UserOperation) => {
const sender = userOp.sender
const [nonceKey] = getNonceKeyAndSequence(userOp.nonce)
const fingerPrint = `${sender}-${nonceKey}`
return `${chainId}:outstanding:pending-ops:${fingerPrint}`
},

// Used to keep track of active (sender, nonceKey) pair queue slots
// - Used for easier cleanup + finding how many ops are pending (+dumping the entire outstanding store)
pendingsOpsIndexList: () => {
return `${chainId}:outstanding:slots`
},

// Secondary index to map userOpHash to pendingOpsKey (used for easy lookup when calling outstanding.remove())
userOpHashLookup: () => {
return `${chainId}:outstanding:user-op-hash-index`
}
}
}

export const createRedisOutstandingQueue = ({
config
}: {
config: AltoConfig
}): OutstandingStore => {
const chainId = config.chainId
const logger = config.getLogger(
{ module: "redis-outstanding-queue" },
{
level: config.logLevel
}
)

const { redisMempoolUrl } = config
if (!redisMempoolUrl) {
throw new Error("missing required redisMempoolUrl")
}

const redisClient = new Redis(redisMempoolUrl)
const redisKeys = createRedisKeys(chainId)

// Adds userOp to queue and maintains sorting by gas price
const addToPriorityQueue = async (userOpInfo: UserOpInfo) => {
const addToReadyOpsQueue = async (userOpInfo: UserOpInfo) => {
const { userOp } = userOpInfo
const senderNonceKey = senderNonceKeyIndex(userOp, chainId)
const pendingOpsKey = redisKeys.pendingOps(userOp)

await redisClient.zadd(
prioriyQueueKey(chainId),
redisKeys.readyOpsQueue(),
Number(userOpInfo.userOp.maxFeePerGas), // Score
senderNonceKey
pendingOpsKey
)
}

// Add userOpInfo to sender:nonceKeyKey queue
const addToSenderNonceKeyQueue = async (userOpInfo: UserOpInfo) => {
const addToPendingOpsQueue = async (userOpInfo: UserOpInfo) => {
const { userOp } = userOpInfo
const [, nonceSeq] = getNonceKeyAndSequence(userOp.nonce)
const senderNonceKey = senderNonceKeyIndex(userOp, chainId)
const pendingOpsKey = redisKeys.pendingOps(userOp)

// Add to the sender:nonceKey sorted set with nonceSeq as score
// Add to the (sender, nonceKey) queue with nonceSeq as score
await redisClient.zadd(
senderNonceKey,
pendingOpsKey,
Number(nonceSeq),
serializeUserOpInfo(userOpInfo)
)

// Record sender:nonceKey key
await redisClient.sadd(senderNonceKeySlotsKey(chainId), senderNonceKey)
// Record (sender, nonceKey) index
await redisClient.sadd(redisKeys.pendingsOpsIndexList(), pendingOpsKey)
}

// Returns sorted pending ops by nonceSeq for a given sender:nonceKey pair
const getSortedPendingOps = async (
userOpInfo: UserOpInfo
): Promise<UserOpInfo[]> => {
const { userOp } = userOpInfo
// Returns sorted pending ops by nonceSeq for a given (sender, nonceKey) pair
const getSortedPendingOps = async ({
userOpInfo,
pendingOpsKey
}: {
userOpInfo?: UserOpInfo
pendingOpsKey?: string
}): Promise<UserOpInfo[]> => {
if (pendingOpsKey) {
const zsetKey = pendingOpsKey
const entries = await redisClient.zrange(zsetKey, 0, -1)
return entries.map(deserializeUserOpInfo)
}

if (userOpInfo) {
const { userOp } = userOpInfo
const zsetKey = redisKeys.pendingOps(userOp)
const entries = await redisClient.zrange(zsetKey, 0, -1)
return entries.map(deserializeUserOpInfo)
}

throw new Error("missing required userOpInfo or pendingOpsKey")
}

const zsetKey = senderNonceKeyIndex(userOp, chainId)
const entries = await redisClient.zrange(zsetKey, 0, -1)
return entries.map(deserializeUserOpInfo)
// Add this helper function to maintain the secondary index
const addToUserOpHashLookup = async (userOpInfo: UserOpInfo) => {
const { userOpHash, userOp } = userOpInfo
const pendingOpsKey = redisKeys.pendingOps(userOp)

// Store mapping from userOpHash to pendingOpsKey and serialized userOpInfo
await redisClient.hset(
redisKeys.userOpHashLookup(),
userOpHash,
pendingOpsKey
)
}

return {
pop: async () => {
// Pop highest score in readyOpsQueue
const multi = redisClient.multi()
multi.zrange(prioriyQueueKey(chainId), 0, 0)
multi.zremrangebyrank(prioriyQueueKey(chainId), 0, 0)
multi.zrange(redisKeys.readyOpsQueue(), 0, 0)
multi.zremrangebyrank(redisKeys.readyOpsQueue(), 0, 0)
const results = await multi.exec()

if (
!results ||
!results[0][1] ||
(results[0][1] as string[]).length === 0
(results[0][1] as string[])?.length === 0
) {
return undefined
}

const userOpInfoStr = (results[0][1] as string[])[0]
const userOpInfo = deserializeUserOpInfo(userOpInfoStr)

// Remove this userOp from the sorted set
const senderNonceKey = senderNonceKeyIndex(
userOpInfo.userOp,
chainId
)
await redisClient.zrem(senderNonceKey, userOpInfoStr)
await redisClient
.multi()
.zrem(redisKeys.pendingOps(userOpInfo.userOp), userOpInfoStr)
.hdel(redisKeys.userOpHashLookup(), userOpInfo.userOpHash)
.exec()

// Get remaining operations for this slot
const sortedOps = await getSortedPendingOps(userOpInfo)
const sortedOps = await getSortedPendingOps({ userOpInfo })

// If there are more items, add the next one to the priority queue
if (sortedOps.length > 0) {
await addToPriorityQueue(sortedOps[0])
await addToReadyOpsQueue(sortedOps[0])
} else {
// If no more pending operations, cleanup by removing the sender:nonceKey and the slot
// If no more pending operations, cleanup by removing the (sender, nonceKey) list
await redisClient
.multi()
.del(senderNonceKey)
.srem(senderNonceKeySlotsKey(chainId), senderNonceKey)
.srem(
redisKeys.pendingsOpsIndexList(),
redisKeys.pendingOps(userOpInfo.userOp)
)
.del(redisKeys.pendingOps(userOpInfo.userOp))
.exec()
}

Expand All @@ -142,66 +178,67 @@ export const createRedisOutstandingQueue = ({
add: async (userOpInfo: UserOpInfo) => {
const { userOpHash } = userOpInfo

addToSenderNonceKeyQueue(userOpInfo)
// Add to (sender, nonceKey) queue
await addToPendingOpsQueue(userOpInfo)

// If this is the lowest nonce for sender:nonceKeyKey pair, update the priority queue
const sortedOps = await getSortedPendingOps(userOpInfo)
// Add to userOpHash index
await addToUserOpHashLookup(userOpInfo)

// If this is the lowest nonce for (sender, nonceKey) pair, update the priority queue
const sortedOps = await getSortedPendingOps({ userOpInfo })
if (
sortedOps.length > 0 &&
sortedOps[0].userOpHash === userOpHash
) {
await addToPriorityQueue(userOpInfo)
await addToReadyOpsQueue(userOpInfo)
}
},

remove: async (userOpHash: HexData32) => {
// Find the userOp in the priority queue
const priorityQueueItems = await redisClient.zrange(
prioriyQueueKey(chainId),
0,
-1
)
const priorityQueue = priorityQueueItems.map(deserializeUserOpInfo)

const userOpInfoIndex = priorityQueue.findIndex(
(info) => info.userOpHash === userOpHash
// Get the userOp info from the secondary index
const pendingOpsKey = await redisClient.hget(
redisKeys.userOpHashLookup(),
userOpHash
)

if (userOpInfoIndex === -1) {
logger.info("tried to remove non-existent user op from mempool")
if (!pendingOpsKey) {
return false
}

const userOpInfo = priorityQueue[userOpInfoIndex]
const senderNonceKey = senderNonceKeyIndex(
userOpInfo.userOp,
chainId
// Check if we are removing the lowest userOp for the (sender, nonceKey) pair
const sortedOps = await getSortedPendingOps({ pendingOpsKey })
const userOpInfo = sortedOps.find(
(sop) => sop.userOpHash === userOpHash
)

// Remove from priority queue
await redisClient.zrem(
prioriyQueueKey(chainId),
priorityQueueItems[userOpInfoIndex]
)
if (!userOpInfo) {
return false
}

// Remove from sorted set
await redisClient.zrem(
senderNonceKey,
serializeUserOpInfo(userOpInfo)
)
if (
sortedOps.length > 0 &&
sortedOps[0].userOpHash === userOpHash
) {
// If this is the lowest userOp in (sender, nonceKey) pair, remove from readyOpsQueue, userOpHashLookup, pendingOpsList
await redisClient
.multi()
.zrem(redisKeys.readyOpsQueue(), pendingOpsKey)
.hdel(redisKeys.userOpHashLookup(), userOpHash)
.zrem(pendingOpsKey, serializeUserOpInfo(userOpInfo))
.exec()

// Get remaining operations for this slot
const sortedOps = await getSortedPendingOps(userOpInfo)
sortedOps.shift()

// If there are more operations, add the first one to priority queue
if (sortedOps.length > 0) {
await addToPriorityQueue(sortedOps[0])
// If there are more operations, add the next lowest nonceSeq to readyOpsQueue
if (sortedOps.length > 0) {
await addToReadyOpsQueue(sortedOps[0])
}
} else {
// If no more operations, remove the hash and the slot
// Otherwise, delete from userOpHashLookup and remove from pendingOpsList
await redisClient
.pipeline()
.del(senderNonceKey)
.srem(senderNonceKeySlotsKey(chainId), senderNonceKey)
.multi()
.hdel(redisKeys.userOpHashLookup(), userOpHash)
.zrem(pendingOpsKey, serializeUserOpInfo(userOpInfo))
.exec()
}

Expand All @@ -211,7 +248,7 @@ export const createRedisOutstandingQueue = ({
dump: async () => {
// Get all slots
const slots = await redisClient.smembers(
senderNonceKeySlotsKey(chainId)
redisKeys.pendingsOpsIndexList()
)
const allOps: UserOpInfo[] = []

Expand All @@ -230,35 +267,27 @@ export const createRedisOutstandingQueue = ({
},

clear: async () => {
// Clear the priority queue
await redisClient.del(prioriyQueueKey(chainId))

// Get all slots
const slots = await redisClient.smembers(
senderNonceKeySlotsKey(chainId)
redisKeys.pendingsOpsIndexList()
)

if (slots.length > 0) {
const pipeline = redisClient.pipeline()

// Delete all hash keys
for (const slot of slots) {
pipeline.del(slot)
}

// Clear the slots set
pipeline.del(senderNonceKeySlotsKey(chainId))

await pipeline.exec()
const multi = redisClient.pipeline()
for (const slot of slots) {
multi.del(slot)
}

return Promise.resolve()
multi.del(redisKeys.pendingsOpsIndexList())
multi.del(redisKeys.userOpHashLookup())
multi.del(redisKeys.readyOpsQueue())

await multi.exec()
},

length: async () => {
// Count total number of operations across all slots
const slots = await redisClient.smembers(
senderNonceKeySlotsKey(chainId)
redisKeys.pendingsOpsIndexList()
)
let totalCount = 0

Expand Down

0 comments on commit 1052182

Please sign in to comment.