Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
PLT-255 Fixed plutus-pab integration test
Browse files Browse the repository at this point in the history
* Reorganized the placement of some functions

* Fixed the `awaitSlot` blockchain action in `plutus-pab` as it wasn't
  working. It was using as reference the slot of the last synced block
  instead of the actual current slot.

* Patched temporarly the PubKey contract so that it waits a bit before
  querying the chain-index. In the future, once we fix the discrepancy
  between information shared between plutus-pab and plutus-chain-index,
  this should be removed.

* Fixed the number of events that are processed by plutus-chain-index
  once we are in sync with the local node. The reason is that once we
  are fully in sync with the local node, we want to process as much
  information as possible and not wait for the queue to be filled.
  • Loading branch information
koslambrou committed Jun 6, 2022
1 parent 509722c commit 7c86a8e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 11 deletions.
4 changes: 2 additions & 2 deletions plutus-chain-index/src/Plutus/ChainIndex/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ period = 2_000_000 -- 2s
-- This approach helps to process blocks with a constant memory usage.
--
-- However, once we are in sync with the node, we want to process every block
-- instead of batches of blocks so that we can update the database as quickly
-- instead of batches of blocks so that we can update the database as frequently
-- as possible.
--
-- Just accumulating 'queueSize' blocks doesn't work as a block can have any number of transactions.
Expand All @@ -45,7 +45,7 @@ measureEventQueueSizeByTxs maxQueueSize (RollForward (CI.Block syncTip transacti
let syncState = getSyncState (tipAsPoint syncTip) (tipAsPoint nodeTip)
txLen = fromIntegral $ length transactions
in if isSyncStateSynced syncState
then max (maxQueueSize + 1) txLen
then maxQueueSize + 1
else txLen
measureEventQueueSizeByTxs maxQueueSize _ = maxQueueSize + 1 -- to handle resume and rollback asap

Expand Down
51 changes: 42 additions & 9 deletions plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,17 @@ startNodeClient ::
-> InstancesState -- ^ In-memory state of running contract instances
-> IO BlockchainEnv
startNodeClient config instancesState = do
let Config { nodeServerConfig = PABServerConfig{pscSocketPath = socket, pscSlotConfig = slotConfig, pscNodeMode, pscNetworkId = NetworkIdWrapper networkId}
, developmentOptions = DevelopmentOptions{pabRollbackHistory, pabResumeFrom = resumePoint} } = config
let Config { nodeServerConfig =
PABServerConfig { pscSocketPath = socket
, pscSlotConfig = slotConfig
, pscNodeMode
, pscNetworkId = NetworkIdWrapper networkId
}
, developmentOptions =
DevelopmentOptions { pabRollbackHistory
, pabResumeFrom = resumePoint
}
} = config
params <- Params.fromPABServerConfig $ nodeServerConfig config
env <- STM.atomically $ emptyBlockchainEnv pabRollbackHistory params
case pscNodeMode of
Expand All @@ -70,9 +79,15 @@ startNodeClient config instancesState = do
AlonzoNode -> do
let resumePoints = maybeToList $ toCardanoPoint resumePoint
void $ Client.runChainSync socket nullTracer slotConfig networkId resumePoints
(\block -> do slot <- TimeSlot.currentSlot slotConfig
STM.atomically $ STM.writeTVar (beCurrentSlot env) slot
handleSyncAction $ processChainSyncEvent instancesState env block
(\block -> do
-- We store the actual current slot in `BlockchainEnv`. Thus,
-- at every new block from the local node, we request for the
-- current slot number and store it. The actual current slot is
-- useful/necessary for blocking contract actions like `awaitSlot`.
slot <- TimeSlot.currentSlot slotConfig
STM.atomically $ STM.writeTVar (beCurrentSlot env) slot

handleSyncAction $ processChainSyncEvent instancesState env block
)
pure env

Expand All @@ -90,7 +105,10 @@ handleSyncAction action = do
either (error . show) (const $ pure ()) result

updateInstances :: IndexedBlock -> InstanceClientEnv -> STM ()
updateInstances IndexedBlock{ibUtxoSpent, ibUtxoProduced} InstanceClientEnv{ceUtxoSpentRequests, ceUtxoProducedRequests} = do
updateInstances
IndexedBlock{ibUtxoSpent, ibUtxoProduced}
InstanceClientEnv{ceUtxoSpentRequests, ceUtxoProducedRequests} = do

forM_ (Map.intersectionWith (,) ibUtxoSpent ceUtxoSpentRequests) $ \(onChainTx, requests) ->
traverse (\OpenTxOutSpentRequest{osrSpendingTx} -> STM.tryPutTMVar osrSpendingTx onChainTx) requests
forM_ (Map.intersectionWith (,) ibUtxoProduced ceUtxoProducedRequests) $ \(txns, requests) ->
Expand Down Expand Up @@ -193,7 +211,15 @@ updateTransactionState
-> BlockchainEnv
-> t (TxId, TxOutBalance, TxValidity)
-> STM (Either SyncActionFailure (Slot, BlockNumber))
updateTransactionState tip env@BlockchainEnv{beRollbackHistory, beTxChanges, beTxOutChanges, beLastSyncedBlockNo} xs = do
updateTransactionState
tip
env@BlockchainEnv{ beRollbackHistory
, beTxChanges
, beTxOutChanges
, beLastSyncedBlockNo
}
xs = do

txIdStateIndex <- STM.readTVar beTxChanges
let txIdState = _usTxUtxoData $ utxoState txIdStateIndex
txUtxoBalanceIndex <- STM.readTVar beTxOutChanges
Expand All @@ -205,7 +231,9 @@ updateTransactionState tip env@BlockchainEnv{beRollbackHistory, beTxChanges, beT
txUtxoBalanceInsert = insert (UtxoState txUtxoBalance' tip) txUtxoBalanceIndex

case (txIdStateInsert, txUtxoBalanceInsert) of
(Right InsertUtxoSuccess{newIndex=newTxIdState}, Right InsertUtxoSuccess{newIndex=newTxOutBalance}) -> do -- TODO: Get tx out status another way
(Right InsertUtxoSuccess{newIndex=newTxIdState}
, Right InsertUtxoSuccess{newIndex=newTxOutBalance}) -> do -- TODO: Get tx out status another way

STM.writeTVar beTxChanges $ trimIx beRollbackHistory newTxIdState
STM.writeTVar beTxOutChanges $ trimIx beRollbackHistory newTxOutBalance
STM.writeTVar beLastSyncedBlockNo (succ blockNumber)
Expand Down Expand Up @@ -235,7 +263,12 @@ insertNewTx blockNumber TxIdState{txnsConfirmed, txnsDeleted} (txi, _, txValidit

-- | Go through the transactions in a block, updating the 'BlockchainEnv'
-- when any interesting addresses or transactions have changed.
processMockBlock :: InstancesState -> BlockchainEnv -> Block -> Slot -> STM (Either SyncActionFailure (Slot, BlockNumber))
processMockBlock
:: InstancesState
-> BlockchainEnv
-> Block
-> Slot
-> STM (Either SyncActionFailure (Slot, BlockNumber))
processMockBlock
instancesState
env@BlockchainEnv{beCurrentSlot, beLastSyncedBlockSlot, beLastSyncedBlockNo}
Expand Down

0 comments on commit 7c86a8e

Please sign in to comment.