-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: relax indexedChainState to ChainState for retrieval #943
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
core/eth/state.go
Outdated
@@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe | |||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move these methods to eth reader and remove this struct?
Not sure why we need a separate struct for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're caching the operator sockets, maybe we should keep this struct separate
// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum | ||
func (cs *ChainState) buildSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) (map[core.OperatorID]string, error) { | ||
socketMap := make(map[core.OperatorID]string) | ||
for _, quorum := range operatorsByQuorum { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the key the quorum and the value map[OperatorIndex]OperatorStake
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes exactly. This loop is to get all the operator ID and their socket from all quorums
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one way to address this would be to preload cache initially by querying every single operator in the current block. Then for the following requests, query the current set of operators, query the logs by filtering on SocketUpdate
event, find any socket updates for existing operators, and refresh sockets for all operators.
Then the subsequent requests will only cost 2 eth calls
core/eth/state.go
Outdated
@@ -38,7 +45,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu | |||
return nil, err | |||
} | |||
|
|||
return getOperatorState(operatorsByQuorum, uint32(blockNumber)) | |||
socketMap, err := cs.buildSocketMap(ctx, operatorsByQuorum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean whenever we call this method, it will query the chain # operators
times?
this seems very inefficient, should we cache this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I expect this too. I can add socket map to the ChainState cache and only query if the operator is missing from the socket map. Although, that means we might need to add a refresh or a failover somewhere so that we don't use the old socket addresses after operators make an updater. Lmk what you think
core/eth/state.go
Outdated
@@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe | |||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're caching the operator sockets, maybe we should keep this struct separate
b8566be
to
59b7ff7
Compare
d393a7e
to
36b9280
Compare
f473ed5
to
ac07b0c
Compare
This PR re-adds an old file, |
71ad17b
to
f0508b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like integrating the operator socket into opInfo
is taking a bigger footprint and may be introducing more risk given that this information isn't needed in the existing ChainState
use cases. This is primarily used to replace IndexedOperatorInfo
in the api clients. Can we consider decoupling this client that fetches operator socket from the ChainState
?
core/eth/reader.go
Outdated
@@ -500,6 +500,12 @@ func (t *Reader) SocketRegistry(ctx context.Context) (gethcommon.Address, error) | |||
}) | |||
} | |||
|
|||
func (t *Reader) RegistryCoordinator(ctx context.Context) (gethcommon.Address, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we already have t.bindings.RegCoordinatorAddr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this function was added for ChainState to filter socket update event logs. Are you suggesting to just return t.bindings.RegCoordinatorAddr
for this function? I thought this way is more standard looking at other address getter functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless the RegistryCoordinator address is updated, we don't need an extra RPC call, right?
core/eth/state.go
Outdated
prematureBreak := false | ||
for _, log := range logs { | ||
cs.socketPrevBlockNumber.Store(uint32(log.BlockNumber - 1)) | ||
transaction, err := cs.getTransaction(ctx, log.TxHash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to fetch the transaction again? Don't we get both operator ID and socket from the logs?
cs.socketMu.Unlock() | ||
|
||
if !ok { | ||
socket, err := cs.Reader.GetOperatorSocket(ctx, operator.OperatorID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making individual calls to get each operator socket seems brittle since it would be hundreds of calls in short bursts, and this method returns error if a single call fails
Have you considered using a batch call instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add an issue of this and improve later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we decouple this operator socket retrieval logic from the ChainState
as suggested here and only use this in api clients, I'd feel more comfortable improving this later vs. now
|
||
// indexSocketMap filters event logs from the previously checked block number to the current block, | ||
// to identify all socket update events in that block range, and update the socket map accordingly | ||
func (cs *ChainState) indexSocketMap(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add tests to these methods? Methods that simply make onchain calls are fine, but some involve non trivial logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
// Mutex to access socket map | ||
socketMu sync.Mutex | ||
// The previous block number the socket map was updated at, inclusive | ||
socketPrevBlockNumber atomic.Uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't appear to need to be atomic any more, since access is fully contained within socketMu
core/eth/state.go
Outdated
|
||
// parseOperatorIDFromEventLog parses the operator ID from a log and returns the operator ID. | ||
func (cs *ChainState) parseOperatorIDFromEventLog(log *types.Log) (core.OperatorID, error) { | ||
if len(log.Topics) < 2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should len(log.Topics)
be exactly 2?
cs.socketMu.Unlock() | ||
|
||
if !ok { | ||
socket, err := cs.Reader.GetOperatorSocket(ctx, operator.OperatorID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we decouple this operator socket retrieval logic from the ChainState
as suggested here and only use this in api clients, I'd feel more comfortable improving this later vs. now
core/eth/reader.go
Outdated
@@ -500,6 +500,12 @@ func (t *Reader) SocketRegistry(ctx context.Context) (gethcommon.Address, error) | |||
}) | |||
} | |||
|
|||
func (t *Reader) RegistryCoordinator(ctx context.Context) (gethcommon.Address, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless the RegistryCoordinator address is updated, we don't need an extra RPC call, right?
Agree that this is taking a bigger footprint than what we originally intended. Stepping back to the original goal of this PR, is to utilize the on-chain storage of operator socket, and remove dependency of TheGraph indexer for retrieving the socket. Recalling the first iteration of the PR, there's no caching or indexing, and simply calls the contract for the socket. We spoke about the concern then, was too many unnecessary RPC calls since What would you think if I remove all the later changes and go back to the first iteration, and instead add a separate function |
I think the caching here is definitely useful even on the client side. Why don't we just factor out the operator socket related methods to a separate component including all methods you implemented and leave the ChainState unchanged? This way the existing functionality of ChainState remain unchanged but only API clients do. Any thoughts? |
} | ||
totalStake.Add(totalStake, op.Stake) | ||
} | ||
|
||
totals[quorumID] = &core.OperatorInfo{ | ||
Stake: totalStake, | ||
Index: core.OperatorIndex(len(quorum)), | ||
// no socket for the total | ||
Socket: "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this even a case ? Can someone register as operator without a socket ?
core/eth/state.go
Outdated
return "", fmt.Errorf("unexpected number of values in event data") | ||
} | ||
|
||
socket, ok := values[0].(string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to check if the socket is mapped with host:port and not something like host:port:port or some bad string unless this is enforced on contract side. The other is, we need to check if the socket ip is some private ip like in 172.x or 192.x or localhost, we should reject. Should also be handled contract side as well.
@hopeyen I'm looking into log filtering for a different task, and my exploration led me to a question about your implementation here: Is there a reason you are manually implementing the log filtering logic, instead of using the autogenerated binding method here? It seems that should do what you've manually implemented here, and also allow you to avoid adding the huge |
Why are these changes needed?
Enable retrieval client to run without indexing
Checks