Skip to content

Commit

Permalink
Merge branch 'develop' into mardizzone/pos-493
Browse files Browse the repository at this point in the history
  • Loading branch information
marcello33 authored Jun 21, 2022
2 parents e8a56da + 3f2d53c commit a00443a
Show file tree
Hide file tree
Showing 21 changed files with 821 additions and 450 deletions.
3 changes: 1 addition & 2 deletions bor/beginblocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
hmTypes "github.com/maticnetwork/heimdall/types"
)

func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k Keeper) {

func BeginBlocker(ctx sdk.Context, _ abci.RequestBeginBlock, k Keeper) {
if ctx.BlockHeight() == int64(helper.SpanOverrideBlockHeight) {
k.Logger(ctx).Info("overriding span BeginBlocker", "height", ctx.BlockHeight())
j, ok := rest.SPAN_OVERRIDES[helper.GenesisDoc.ChainID]
Expand Down
2 changes: 1 addition & 1 deletion bor/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func HandleMsgProposeSpan(ctx sdk.Context, msg types.MsgProposeSpan, k Keeper) s
spanDuration := k.GetParams(ctx).SpanDuration
if spanDuration != (msg.EndBlock - msg.StartBlock + 1) {
k.Logger(ctx).Error("Span duration of proposed span is wrong",
"proposedSpanDuration", (msg.EndBlock - msg.StartBlock + 1),
"proposedSpanDuration", msg.EndBlock-msg.StartBlock+1,
"paramsSpanDuration", spanDuration,
)
return common.ErrInvalidSpanDuration(k.Codespace()).Result()
Expand Down
18 changes: 8 additions & 10 deletions bor/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/maticnetwork/bor/common"
"github.com/maticnetwork/heimdall/bor/types"
chainmanager "github.com/maticnetwork/heimdall/chainmanager"
"github.com/maticnetwork/heimdall/chainmanager"
"github.com/maticnetwork/heimdall/helper"
"github.com/maticnetwork/heimdall/params/subspace"
"github.com/maticnetwork/heimdall/staking"
Expand Down Expand Up @@ -45,7 +45,7 @@ type Keeper struct {
chainKeeper chainmanager.Keeper
}

// NewKeeper create new keeper
// NewKeeper is the constructor of Keeper
func NewKeeper(
cdc *codec.Codec,
storeKey sdk.StoreKey,
Expand All @@ -55,8 +55,7 @@ func NewKeeper(
stakingKeeper staking.Keeper,
caller helper.ContractCaller,
) Keeper {
// create keeper
keeper := Keeper{
return Keeper{
cdc: cdc,
storeKey: storeKey,
paramSpace: paramSpace.WithKeyTable(types.ParamKeyTable()),
Expand All @@ -65,7 +64,6 @@ func NewKeeper(
sk: stakingKeeper,
contractCaller: caller,
}
return keeper
}

// Codespace returns the codespace
Expand Down Expand Up @@ -152,9 +150,6 @@ func (k *Keeper) GetAllSpans(ctx sdk.Context) (spans []*hmTypes.Span) {
func (k *Keeper) GetSpanList(ctx sdk.Context, page uint64, limit uint64) ([]hmTypes.Span, error) {
store := ctx.KVStore(k.storeKey)

// create spans
var spans []hmTypes.Span

// have max limit
if limit > 20 {
limit = 20
Expand All @@ -164,6 +159,7 @@ func (k *Keeper) GetSpanList(ctx sdk.Context, page uint64, limit uint64) ([]hmTy
iterator := hmTypes.KVStorePrefixIteratorPaginated(store, SpanPrefixKey, uint(page), uint(limit))

// loop through validators to get valid validators
var spans []hmTypes.Span
for ; iterator.Valid(); iterator.Next() {
var span hmTypes.Span
if err := k.cdc.UnmarshalBinaryBare(iterator.Value(), &span); err == nil {
Expand Down Expand Up @@ -251,7 +247,9 @@ func (k *Keeper) SelectNextProducers(ctx sdk.Context, seed common.Hash) (vals []
val.VotingPower = int64(value)
vals = append(vals, val)
}
} // sort by address
}

// sort by address
vals = hmTypes.SortValidatorByAddress(vals)

return vals, nil
Expand Down Expand Up @@ -324,7 +322,7 @@ func (k *Keeper) GetParams(ctx sdk.Context) (params types.Params) {
// Utils
//

// IterateSpansAndApplyFn interate spans and apply the given function.
// IterateSpansAndApplyFn iterates spans and apply the given function.
func (k *Keeper) IterateSpansAndApplyFn(ctx sdk.Context, f func(span hmTypes.Span) error) {
store := ctx.KVStore(k.storeKey)

Expand Down
2 changes: 1 addition & 1 deletion bridge/setu/listener/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (bl *BaseListener) StartHeaderProcess(ctx context.Context) {
}
}

// startPolling starts polling
// StartPolling starts polling
func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Duration) {
// How often to fire the passed in function in second
interval := pollInterval
Expand Down
161 changes: 131 additions & 30 deletions bridge/setu/listener/rootchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ func NewRootChainListener() *RootChainListener {
if err != nil {
panic(err)
}

abis := []*abi.ABI{
&contractCaller.RootChainABI,
&contractCaller.StateSenderABI,
&contractCaller.StakingInfoABI,
}
rootchainListener := &RootChainListener{

return &RootChainListener{
abis: abis,
stakingInfoAbi: &contractCaller.StakingInfoABI,
stateSenderAbi: &contractCaller.StateSenderABI,
}
return rootchainListener
}

// Start starts new block subscription
Expand Down Expand Up @@ -83,10 +84,11 @@ func (rl *RootChainListener) Start() error {
// start go routine to listen new header using subscription
go rl.StartSubscription(ctx, subscription)
}

// subscribed to new head
rl.Logger.Info("Subscribed to new head")

// Start self-healing process
go rl.startSelfHealing(ctx)

return nil
}

Expand All @@ -99,12 +101,12 @@ func (rl *RootChainListener) ProcessHeader(newHeader *types.Header) {
if err != nil {
return
}

requiredConfirmations := rootchainContext.ChainmanagerParams.MainchainTxConfirmations
latestNumber := newHeader.Number

// confirmation
confirmationBlocks := big.NewInt(0).SetUint64(requiredConfirmations)

if latestNumber.Cmp(confirmationBlocks) <= 0 {
rl.Logger.Error("Block number less than Confirmations required", "blockNumber", latestNumber.Uint64, "confirmationsRequired", confirmationBlocks.Uint64)
return
Expand All @@ -131,52 +133,52 @@ func (rl *RootChainListener) ProcessHeader(newHeader *types.Header) {
}
}

// to block
// Prepare blocks range
toBlock := latestNumber

if toBlock.Cmp(fromBlock) == -1 {
fromBlock = toBlock
}

// set last block to storage
if err := rl.storageClient.Put([]byte(lastRootBlockKey), []byte(toBlock.String()), nil); err != nil {
// Set last block to storage
if err = rl.storageClient.Put([]byte(lastRootBlockKey), []byte(toBlock.String()), nil); err != nil {
rl.Logger.Error("rl.storageClient.Put", "Error", err)
}

// query events
// Handle events
rl.queryAndBroadcastEvents(rootchainContext, fromBlock, toBlock)
}

// queryAndBroadcastEvents fetches supported events from the rootchain and handles all of them
func (rl *RootChainListener) queryAndBroadcastEvents(rootchainContext *RootChainListenerContext, fromBlock *big.Int, toBlock *big.Int) {
rl.Logger.Info("Query rootchain event logs", "fromBlock", fromBlock, "toBlock", toBlock)

// current public key
pubkey := helper.GetPubKey()
pubkeyBytes := pubkey[1:]
ctx, cancel := context.WithTimeout(context.Background(), rl.contractConnector.MainChainTimeout)
defer cancel()

// get chain params
chainParams := rootchainContext.ChainmanagerParams.ChainParams

// draft a query
query := ethereum.FilterQuery{FromBlock: fromBlock, ToBlock: toBlock, Addresses: []ethCommon.Address{
chainParams.RootChainAddress.EthAddress(),
chainParams.StakingInfoAddress.EthAddress(),
chainParams.StateSenderAddress.EthAddress(),
}}
// get logs from rootchain by filter

ethClientTimeout := rl.contractConnector.MainChainTimeout
ctx, cancel := context.WithTimeout(context.Background(), ethClientTimeout)
defer cancel()
logs, err := rl.contractConnector.MainChainClient.FilterLogs(ctx, query)
// Fetch events from the rootchain
logs, err := rl.contractConnector.MainChainClient.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: toBlock,
Addresses: []ethCommon.Address{
chainParams.RootChainAddress.EthAddress(),
chainParams.StakingInfoAddress.EthAddress(),
chainParams.StateSenderAddress.EthAddress(),
},
})
if err != nil {
rl.Logger.Error("Error while filtering logs", "error", err)
return
} else if len(logs) > 0 {
rl.Logger.Debug("New logs found", "numberOfLogs", len(logs))
}

// process filtered log
pubkey := helper.GetPubKey()
pubkeyBytes := pubkey[1:]

// Process filtered log
for _, vLog := range logs {
topic := vLog.Topics[0].Bytes()
for _, abiObject := range rl.abis {
Expand Down Expand Up @@ -274,7 +276,108 @@ func (rl *RootChainListener) queryAndBroadcastEvents(rootchainContext *RootChain
rl.SendTaskWithDelay("sendUnjailToHeimdall", selectedEvent.Name, logBytes, delay, event)
}
}
if selectedEvent == nil {
continue
}

rl.handleLog(vLog, selectedEvent, pubkeyBytes)
}
}
}

// handleLog handles the given log
func (rl *RootChainListener) handleLog(vLog types.Log, selectedEvent *abi.Event, pubkeyBytes []byte) {
logBytes, _ := json.Marshal(vLog)

rl.Logger.Debug("ReceivedEvent", "eventname", selectedEvent.Name)
switch selectedEvent.Name {
case "NewHeaderBlock":
if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, selectedEvent); isCurrentValidator {
rl.sendTaskWithDelay("sendCheckpointAckToHeimdall", selectedEvent.Name, logBytes, delay, selectedEvent)
}
case "Staked":
event := new(stakinginfo.StakinginfoStaked)
if err := helper.UnpackLog(rl.stakingInfoAbi, event, selectedEvent.Name, &vLog); err != nil {
rl.Logger.Error("Error while parsing event", "name", selectedEvent.Name, "error", err)
}
if bytes.Equal(event.SignerPubkey, pubkeyBytes) {
// topup has to be processed first before validator join. so adding delay.
delay := util.TaskDelayBetweenEachVal
rl.sendTaskWithDelay("sendValidatorJoinToHeimdall", selectedEvent.Name, logBytes, delay, event)
} else if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, event); isCurrentValidator {
// topup has to be processed first before validator join. so adding delay.
delay = delay + util.TaskDelayBetweenEachVal
rl.sendTaskWithDelay("sendValidatorJoinToHeimdall", selectedEvent.Name, logBytes, delay, event)
}

case "StakeUpdate":
event := new(stakinginfo.StakinginfoStakeUpdate)
if err := helper.UnpackLog(rl.stakingInfoAbi, event, selectedEvent.Name, &vLog); err != nil {
rl.Logger.Error("Error while parsing event", "name", selectedEvent.Name, "error", err)
}
if util.IsEventSender(rl.cliCtx, event.ValidatorId.Uint64()) {
rl.sendTaskWithDelay("sendStakeUpdateToHeimdall", selectedEvent.Name, logBytes, 0, event)
} else if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, event); isCurrentValidator {
rl.sendTaskWithDelay("sendStakeUpdateToHeimdall", selectedEvent.Name, logBytes, delay, event)
}

case "SignerChange":
event := new(stakinginfo.StakinginfoSignerChange)
if err := helper.UnpackLog(rl.stakingInfoAbi, event, selectedEvent.Name, &vLog); err != nil {
rl.Logger.Error("Error while parsing event", "name", selectedEvent.Name, "error", err)
}
if bytes.Equal(event.SignerPubkey, pubkeyBytes) {
rl.sendTaskWithDelay("sendSignerChangeToHeimdall", selectedEvent.Name, logBytes, 0, event)
} else if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, event); isCurrentValidator {
rl.sendTaskWithDelay("sendSignerChangeToHeimdall", selectedEvent.Name, logBytes, delay, event)
}

case "UnstakeInit":
event := new(stakinginfo.StakinginfoUnstakeInit)
if err := helper.UnpackLog(rl.stakingInfoAbi, event, selectedEvent.Name, &vLog); err != nil {
rl.Logger.Error("Error while parsing event", "name", selectedEvent.Name, "error", err)
}
if util.IsEventSender(rl.cliCtx, event.ValidatorId.Uint64()) {
rl.sendTaskWithDelay("sendUnstakeInitToHeimdall", selectedEvent.Name, logBytes, 0, event)
} else if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, event); isCurrentValidator {
rl.sendTaskWithDelay("sendUnstakeInitToHeimdall", selectedEvent.Name, logBytes, delay, event)
}

case "StateSynced":
event := new(statesender.StatesenderStateSynced)
if err := helper.UnpackLog(rl.stateSenderAbi, event, selectedEvent.Name, &vLog); err != nil {
rl.Logger.Error("Error while parsing event", "name", selectedEvent.Name, "error", err)
}
rl.Logger.Info("StateSyncedEvent: detected", "stateSyncId", event.Id)
if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, event); isCurrentValidator {
rl.sendTaskWithDelay("sendStateSyncedToHeimdall", selectedEvent.Name, logBytes, delay, event)
}

case "TopUpFee":
event := new(stakinginfo.StakinginfoTopUpFee)
if err := helper.UnpackLog(rl.stakingInfoAbi, event, selectedEvent.Name, &vLog); err != nil {
rl.Logger.Error("Error while parsing event", "name", selectedEvent.Name, "error", err)
}
if bytes.Equal(event.User.Bytes(), helper.GetAddress()) {
rl.sendTaskWithDelay("sendTopUpFeeToHeimdall", selectedEvent.Name, logBytes, 0, event)
} else if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, event); isCurrentValidator {
rl.sendTaskWithDelay("sendTopUpFeeToHeimdall", selectedEvent.Name, logBytes, delay, event)
}

case "Slashed":
if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, selectedEvent); isCurrentValidator {
rl.sendTaskWithDelay("sendTickAckToHeimdall", selectedEvent.Name, logBytes, delay, selectedEvent)
}

case "UnJailed":
event := new(stakinginfo.StakinginfoUnJailed)
if err := helper.UnpackLog(rl.stakingInfoAbi, event, selectedEvent.Name, &vLog); err != nil {
rl.Logger.Error("Error while parsing event", "name", selectedEvent.Name, "error", err)
}
if util.IsEventSender(rl.cliCtx, event.ValidatorId.Uint64()) {
rl.sendTaskWithDelay("sendUnjailToHeimdall", selectedEvent.Name, logBytes, 0, event)
} else if isCurrentValidator, delay := util.CalculateTaskDelay(rl.cliCtx, event); isCurrentValidator {
rl.sendTaskWithDelay("sendUnjailToHeimdall", selectedEvent.Name, logBytes, delay, event)
}
}
}
Expand All @@ -301,16 +404,14 @@ func (rl *RootChainListener) SendTaskWithDelay(taskName string, eventName string
eta := time.Now().Add(delay)
signature.ETA = &eta
rl.Logger.Info("Sending task", "taskName", taskName, "currentTime", time.Now(), "delayTime", eta)

_, err := rl.queueConnector.Server.SendTask(signature)
if err != nil {
rl.Logger.Error("Error sending task", "taskName", taskName, "error", err)
}
}

//
// utils
//

// getRootChainContext returns the root chain context
func (rl *RootChainListener) getRootChainContext() (*RootChainListenerContext, error) {
chainmanagerParams, err := util.GetChainmanagerParams(rl.cliCtx)
if err != nil {
Expand Down
Loading

0 comments on commit a00443a

Please sign in to comment.