Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relax indexedChainState to ChainState for retrieval #943

Open
wants to merge 35 commits into
base: master
Choose a base branch
from

Conversation

hopeyen
Copy link
Contributor

@hopeyen hopeyen commented Dec 2, 2024

Why are these changes needed?

Enable retrieval client to run without indexing

Checks

  • I've made sure the tests are passing. Note that there might be a few flaky tests, in that case, please comment that they are not relevant.
  • I've checked the new test coverage and the coverage percentage didn't drop.
  • Testing Strategy
    • Unit tests
    • Integration tests
    • This PR is not tested :(

Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move these methods to eth reader and remove this struct?
Not sure why we need a separate struct for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're caching the operator sockets, maybe we should keep this struct separate

// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum
func (cs *ChainState) buildSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) (map[core.OperatorID]string, error) {
socketMap := make(map[core.OperatorID]string)
for _, quorum := range operatorsByQuorum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the key the quorum and the value map[OperatorIndex]OperatorStake here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly. This loop is to get all the operator ID and their socket from all quorums

Copy link
Contributor

@ian-shim ian-shim Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one way to address this would be to preload cache initially by querying every single operator in the current block. Then for the following requests, query the current set of operators, query the logs by filtering on SocketUpdate event, find any socket updates for existing operators, and refresh sockets for all operators.
Then the subsequent requests will only cost 2 eth calls

@ian-shim ian-shim mentioned this pull request Dec 20, 2024
5 tasks
@@ -38,7 +45,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap, err := cs.buildSocketMap(ctx, operatorsByQuorum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean whenever we call this method, it will query the chain # operators times?
this seems very inefficient, should we cache this?

Copy link
Contributor Author

@hopeyen hopeyen Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I expect this too. I can add socket map to the ChainState cache and only query if the operator is missing from the socket map. Although, that means we might need to add a refresh or a failover somewhere so that we don't use the old socket addresses after operators make an updater. Lmk what you think

@@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're caching the operator sockets, maybe we should keep this struct separate

@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch from b8566be to 59b7ff7 Compare December 20, 2024 03:09
@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch 2 times, most recently from d393a7e to 36b9280 Compare January 3, 2025 06:40
@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch from f473ed5 to ac07b0c Compare February 11, 2025 15:43
@litt3
Copy link
Contributor

litt3 commented Feb 11, 2025

This PR re-adds an old file, contracts/bindings/EigenDABlobVerifier/binding.go. Presumably this was a merge conflict- this file can be deleted

@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch from 71ad17b to f0508b7 Compare February 13, 2025 00:05
@litt3 litt3 requested a review from anupsv February 19, 2025 18:07
Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like integrating the operator socket into opInfo is taking a bigger footprint and may be introducing more risk given that this information isn't needed in the existing ChainState use cases. This is primarily used to replace IndexedOperatorInfo in the api clients. Can we consider decoupling this client that fetches operator socket from the ChainState?

@@ -500,6 +500,12 @@ func (t *Reader) SocketRegistry(ctx context.Context) (gethcommon.Address, error)
})
}

func (t *Reader) RegistryCoordinator(ctx context.Context) (gethcommon.Address, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already have t.bindings.RegCoordinatorAddr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this function was added for ChainState to filter socket update event logs. Are you suggesting to just return t.bindings.RegCoordinatorAddr for this function? I thought this way is more standard looking at other address getter functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the RegistryCoordinator address is updated, we don't need an extra RPC call, right?

prematureBreak := false
for _, log := range logs {
cs.socketPrevBlockNumber.Store(uint32(log.BlockNumber - 1))
transaction, err := cs.getTransaction(ctx, log.TxHash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to fetch the transaction again? Don't we get both operator ID and socket from the logs?

cs.socketMu.Unlock()

if !ok {
socket, err := cs.Reader.GetOperatorSocket(ctx, operator.OperatorID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making individual calls to get each operator socket seems brittle since it would be hundreds of calls in short bursts, and this method returns error if a single call fails
Have you considered using a batch call instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an issue of this and improve later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decouple this operator socket retrieval logic from the ChainState as suggested here and only use this in api clients, I'd feel more comfortable improving this later vs. now


// indexSocketMap filters event logs from the previously checked block number to the current block,
// to identify all socket update events in that block range, and update the socket map accordingly
func (cs *ChainState) indexSocketMap(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests to these methods? Methods that simply make onchain calls are fine, but some involve non trivial logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

// Mutex to access socket map
socketMu sync.Mutex
// The previous block number the socket map was updated at, inclusive
socketPrevBlockNumber atomic.Uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't appear to need to be atomic any more, since access is fully contained within socketMu


// parseOperatorIDFromEventLog parses the operator ID from a log and returns the operator ID.
func (cs *ChainState) parseOperatorIDFromEventLog(log *types.Log) (core.OperatorID, error) {
if len(log.Topics) < 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should len(log.Topics) be exactly 2?

cs.socketMu.Unlock()

if !ok {
socket, err := cs.Reader.GetOperatorSocket(ctx, operator.OperatorID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decouple this operator socket retrieval logic from the ChainState as suggested here and only use this in api clients, I'd feel more comfortable improving this later vs. now

@@ -500,6 +500,12 @@ func (t *Reader) SocketRegistry(ctx context.Context) (gethcommon.Address, error)
})
}

func (t *Reader) RegistryCoordinator(ctx context.Context) (gethcommon.Address, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the RegistryCoordinator address is updated, we don't need an extra RPC call, right?

@hopeyen
Copy link
Contributor Author

hopeyen commented Feb 24, 2025

I feel like integrating the operator socket into opInfo is taking a bigger footprint and may be introducing more risk given that this information isn't needed in the existing ChainState use cases. This is primarily used to replace IndexedOperatorInfo in the api clients. Can we consider decoupling this client that fetches operator socket from the ChainState?

Agree that this is taking a bigger footprint than what we originally intended. Stepping back to the original goal of this PR, is to utilize the on-chain storage of operator socket, and remove dependency of TheGraph indexer for retrieving the socket.

Recalling the first iteration of the PR, there's no caching or indexing, and simply calls the contract for the socket. We spoke about the concern then, was too many unnecessary RPC calls since GetOperatorState is called for many components other than the retrieval client, thus we started writing the cache/indexing. Some of the code looks more and more like our existing indexer component. I think we got caught up reinventing the indexer wheel, maybe?

What would you think if I remove all the later changes and go back to the first iteration, and instead add a separate function GetOperatorSockets, and only have retrieval client calls it? Preferably, let's not worry about the indexing in this PR, and try to optimize if RPC calls for the sockets turned out to be a problem for the retrieval client?

@ian-shim
Copy link
Contributor

I think the caching here is definitely useful even on the client side. Why don't we just factor out the operator socket related methods to a separate component including all methods you implemented and leave the ChainState unchanged? This way the existing functionality of ChainState remain unchanged but only API clients do. Any thoughts?

}
totalStake.Add(totalStake, op.Stake)
}

totals[quorumID] = &core.OperatorInfo{
Stake: totalStake,
Index: core.OperatorIndex(len(quorum)),
// no socket for the total
Socket: "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this even a case ? Can someone register as operator without a socket ?

return "", fmt.Errorf("unexpected number of values in event data")
}

socket, ok := values[0].(string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to check if the socket is mapped with host:port and not something like host:port:port or some bad string unless this is enforced on contract side. The other is, we need to check if the socket ip is some private ip like in 172.x or 192.x or localhost, we should reject. Should also be handled contract side as well.

@litt3
Copy link
Contributor

litt3 commented Feb 24, 2025

@hopeyen I'm looking into log filtering for a different task, and my exploration led me to a question about your implementation here:

Is there a reason you are manually implementing the log filtering logic, instead of using the autogenerated binding method here? It seems that should do what you've manually implemented here, and also allow you to avoid adding the huge RegistryCoordinator.json file. Am I missing something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants