From f6572d5d4d5b6ab6e3740de2fe02c00e09d7c289 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Feb 2025 18:16:36 -0500 Subject: [PATCH 1/5] network: fix peerstore stats update race --- network/p2p/peerstore/peerstore.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 5516db6250..059a1ad9d9 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -148,7 +148,9 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura } // Remove the expired elements from e.data[addr].recentConnectionTimes - ps.popNElements(numElmtsToRemove, peerID) + ad.recentConnectionTimes = ad.recentConnectionTimes[numElmtsToRemove:] + _ = ps.Put(peerID, addressDataKey, ad) + // If there are max number of connections within the time window, wait metadata, _ = ps.Get(peerID, addressDataKey) ad, ok = metadata.(addressData) @@ -310,16 +312,6 @@ func (ps *PeerStore) appendTime(peerID peer.ID, t time.Time) { _ = ps.Put(peerID, addressDataKey, ad) } -// PopEarliestTime removes the earliest time from recentConnectionTimes in -// addressData for addr -// It is expected to be later than ConnectionsRateLimitingWindow -func (ps *PeerStore) popNElements(n int, peerID peer.ID) { - data, _ := ps.Get(peerID, addressDataKey) - ad := data.(addressData) - ad.recentConnectionTimes = ad.recentConnectionTimes[n:] - _ = ps.Put(peerID, addressDataKey, ad) -} - func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo { o := make([]*peer.AddrInfo, 0, len(ps.Peers())) for _, peerID := range ps.Peers() { From af00ab88a7556149799ed95006ecb980001ef6d6 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 25 Feb 2025 10:30:02 -0500 Subject: [PATCH 2/5] peerstore: sync addressData.recentConnectionTimes access --- network/p2p/peerstore/peerstore.go | 49 +++++++++++++----------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 059a1ad9d9..d1a27ab803 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -127,7 +127,6 @@ func (ps *PeerStore) UpdateRetryAfter(addr string, retryAfter time.Time) { func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Duration, time.Time) { curTime := time.Now() var timeSince time.Duration - var numElmtsToRemove int peerID := peer.ID(addrOrPeerID) metadata, err := ps.Get(peerID, addressDataKey) if err != nil { @@ -137,8 +136,13 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura if !ok { return false, 0 /* not used */, curTime /* not used */ } + // Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds - for numElmtsToRemove < len(ad.recentConnectionTimes) { + ad.mu.Lock() + + originalLen := len(ad.recentConnectionTimes) + var numElmtsToRemove int + for numElmtsToRemove < originalLen { timeSince = curTime.Sub(ad.recentConnectionTimes[numElmtsToRemove]) if timeSince >= ps.connectionsRateLimitingWindow { numElmtsToRemove++ @@ -149,27 +153,22 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura // Remove the expired elements from e.data[addr].recentConnectionTimes ad.recentConnectionTimes = ad.recentConnectionTimes[numElmtsToRemove:] - _ = ps.Put(peerID, addressDataKey, ad) // If there are max number of connections within the time window, wait - metadata, _ = ps.Get(peerID, addressDataKey) - ad, ok = metadata.(addressData) - if !ok { - return false, 0 /* not used */, curTime /* not used */ - } - numElts := len(ad.recentConnectionTimes) - if uint(numElts) >= ps.connectionsRateLimitingCount { - return true, /* true */ - ps.connectionsRateLimitingWindow - timeSince, curTime /* not used */ + remainingLength := originalLen - numElmtsToRemove + var waitTime time.Duration + if uint(remainingLength) >= ps.connectionsRateLimitingCount { + waitTime = ps.connectionsRateLimitingWindow - timeSince + } else { + // Else, there is space in connectionsRateLimitingCount. The + // connection request of the caller will proceed + // Append the provisional time for the next connection request + ad.recentConnectionTimes = append(ad.recentConnectionTimes, curTime) } + ad.mu.Unlock() - // Else, there is space in connectionsRateLimitingCount. The - // connection request of the caller will proceed - // Update curTime, since it may have significantly changed if waited - provisionalTime := time.Now() - // Append the provisional time for the next connection request - ps.appendTime(peerID, provisionalTime) - return true, 0 /* no wait. proceed */, provisionalTime + _ = ps.Put(peerID, addressDataKey, ad) + return true, waitTime, curTime } // UpdateConnectionTime updates the connection time for the given address. @@ -189,6 +188,9 @@ func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime t }() // Find the provisionalTime and update it + ad.mu.Lock() + defer ad.mu.Unlock() + entry := ad.recentConnectionTimes for indx, val := range entry { if provisionalTime == val { @@ -303,15 +305,6 @@ func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) { } } -// AppendTime adds the current time to recentConnectionTimes in -// addressData of addr -func (ps *PeerStore) appendTime(peerID peer.ID, t time.Time) { - data, _ := ps.Get(peerID, addressDataKey) - ad := data.(addressData) - ad.recentConnectionTimes = append(ad.recentConnectionTimes, t) - _ = ps.Put(peerID, addressDataKey, ad) -} - func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo { o := make([]*peer.AddrInfo, 0, len(ps.Peers())) for _, peerID := range ps.Peers() { From 6a845a68a5f0a0fa00b60bbfa163db1defa4974c Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 28 Feb 2025 12:56:14 -0500 Subject: [PATCH 3/5] CR: fix zero vals --- network/p2p/peerstore/peerstore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 47dcc424fe..b9900234cb 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -138,7 +138,7 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura ad.mu.Lock() originalLen := len(ad.recentConnectionTimes) - var numElmtsToRemove int + numElmtsToRemove := 0 for numElmtsToRemove < originalLen { timeSince = curTime.Sub(ad.recentConnectionTimes[numElmtsToRemove]) if timeSince >= ps.connectionsRateLimitingWindow { @@ -153,7 +153,7 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura // If there are max number of connections within the time window, wait remainingLength := originalLen - numElmtsToRemove - var waitTime time.Duration + var waitTime time.Duration = 0 if uint(remainingLength) >= ps.connectionsRateLimitingCount { waitTime = ps.connectionsRateLimitingWindow - timeSince } else { From c83808e97714a464e5aef26fc43a82bf67b8e497 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 28 Feb 2025 13:19:49 -0500 Subject: [PATCH 4/5] remove addressData lock --- network/p2p/peerstore/peerstore.go | 70 +++++++++++++++++++----------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index b9900234cb..bfa1942c1e 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -42,6 +42,7 @@ type PeerStore struct { peerStoreCAB connectionsRateLimitingCount uint connectionsRateLimitingWindow time.Duration + lock deadlock.Mutex } // addressData: holds the information associated with each phonebook address. @@ -55,7 +56,6 @@ type addressData struct { // networkNames: lists the networks to which the given address belongs. networkNames map[string]bool - mu *deadlock.RWMutex // roles is the roles that this address serves. roles phonebook.RoleSet @@ -123,7 +123,10 @@ func (ps *PeerStore) UpdateRetryAfter(addr string, retryAfter time.Time) { // The provisional time should be updated after the connection with UpdateConnectionTime func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Duration, time.Time) { curTime := time.Now() - var timeSince time.Duration + + ps.lock.Lock() + defer ps.lock.Unlock() + peerID := peer.ID(addrOrPeerID) metadata, err := ps.Get(peerID, addressDataKey) if err != nil { @@ -135,10 +138,9 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura } // Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds - ad.mu.Lock() - originalLen := len(ad.recentConnectionTimes) numElmtsToRemove := 0 + timeSince := time.Duration(0) for numElmtsToRemove < originalLen { timeSince = curTime.Sub(ad.recentConnectionTimes[numElmtsToRemove]) if timeSince >= ps.connectionsRateLimitingWindow { @@ -149,27 +151,29 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura } // Remove the expired elements from e.data[addr].recentConnectionTimes - ad.recentConnectionTimes = ad.recentConnectionTimes[numElmtsToRemove:] + ps.popNElements(numElmtsToRemove, peerID) // If there are max number of connections within the time window, wait remainingLength := originalLen - numElmtsToRemove - var waitTime time.Duration = 0 if uint(remainingLength) >= ps.connectionsRateLimitingCount { - waitTime = ps.connectionsRateLimitingWindow - timeSince - } else { - // Else, there is space in connectionsRateLimitingCount. The - // connection request of the caller will proceed - // Append the provisional time for the next connection request - ad.recentConnectionTimes = append(ad.recentConnectionTimes, curTime) + return true, /* true */ + ps.connectionsRateLimitingWindow - timeSince, curTime /* not used */ } - ad.mu.Unlock() - _ = ps.Put(peerID, addressDataKey, ad) - return true, waitTime, curTime + // Else, there is space in connectionsRateLimitingCount. The + // connection request of the caller will proceed + // Update curTime, since it may have significantly changed if waited + provisionalTime := time.Now() + // Append the provisional time for the next connection request + ps.appendTime(peerID, provisionalTime) + return true, 0 /* no wait. proceed */, provisionalTime } // UpdateConnectionTime updates the connection time for the given address. func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool { + ps.lock.Lock() + defer ps.lock.Unlock() + peerID := peer.ID(addrOrPeerID) metadata, err := ps.Get(peerID, addressDataKey) if err != nil { @@ -185,9 +189,6 @@ func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime t }() // Find the provisionalTime and update it - ad.mu.Lock() - defer ad.mu.Unlock() - entry := ad.recentConnectionTimes for indx, val := range entry { if provisionalTime == val { @@ -210,6 +211,9 @@ func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime t // existing items that aren't included in addressesThey are being removed // matching entries roles gets updated as needed and persistent peers are not touched func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName string, role phonebook.Role) { + ps.lock.Lock() + defer ps.lock.Unlock() + // prepare a map of items we'd like to remove. removeItems := make(map[peer.ID]bool, 0) peerIDs := ps.Peers() @@ -218,7 +222,6 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName if data != nil { ad := data.(addressData) updated := false - ad.mu.RLock() if ad.networkNames[networkName] && !ad.roles.IsPersistent(role) { if ad.roles.Is(role) { removeItems[pid] = true @@ -227,8 +230,6 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName updated = true } } - ad.mu.RUnlock() - if updated { _ = ps.Put(pid, addressDataKey, ad) } @@ -241,10 +242,8 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName // we already have this // update the networkName and role ad := data.(addressData) - ad.mu.Lock() ad.networkNames[networkName] = true ad.roles.Add(role) - ad.mu.Unlock() _ = ps.Put(info.ID, addressDataKey, ad) // do not remove this entry @@ -267,6 +266,9 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName // i.e. they won't be replaced by ReplacePeerList calls. // If a peer is already in the peerstore, its role will be updated. func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName string, role phonebook.Role) { + ps.lock.Lock() + defer ps.lock.Unlock() + for _, info := range addrInfo { data, _ := ps.Get(info.ID, addressDataKey) if data != nil { @@ -293,7 +295,6 @@ func (ps *PeerStore) Length() int { func makePhonebookEntryData(networkName string, role phonebook.Role, persistent bool) addressData { pbData := addressData{ networkNames: make(map[string]bool), - mu: &deadlock.RWMutex{}, recentConnectionTimes: make([]time.Time, 0), roles: phonebook.MakeRoleSet(role, persistent), } @@ -301,16 +302,33 @@ func makePhonebookEntryData(networkName string, role phonebook.Role, persistent return pbData } +// appendTime adds the current time to recentConnectionTimes in +// addressData of addr +func (ps *PeerStore) appendTime(peerID peer.ID, t time.Time) { + data, _ := ps.Get(peerID, addressDataKey) + ad := data.(addressData) + ad.recentConnectionTimes = append(ad.recentConnectionTimes, t) + _ = ps.Put(peerID, addressDataKey, ad) +} + +// popNElements removes the earliest time from recentConnectionTimes in +// addressData for addr +// It is expected to be later than ConnectionsRateLimitingWindow +func (ps *PeerStore) popNElements(n int, peerID peer.ID) { + data, _ := ps.Get(peerID, addressDataKey) + ad := data.(addressData) + ad.recentConnectionTimes = ad.recentConnectionTimes[n:] + _ = ps.Put(peerID, addressDataKey, ad) +} + func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) { data, err := ps.Get(peerID, addressDataKey) if err != nil { return } ad := data.(addressData) - ad.mu.Lock() delete(ad.networkNames, networkName) isEmpty := len(ad.networkNames) == 0 - ad.mu.Unlock() if isEmpty { ps.ClearAddrs(peerID) _ = ps.Put(peerID, addressDataKey, nil) From f88628093cce88acebb4bbabe0518846917fb178 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 28 Feb 2025 13:40:22 -0500 Subject: [PATCH 5/5] self review --- network/p2p/peerstore/peerstore.go | 41 ++++++++++++++++-------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index bfa1942c1e..9bb5a46e88 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -138,10 +138,9 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura } // Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds - originalLen := len(ad.recentConnectionTimes) numElmtsToRemove := 0 timeSince := time.Duration(0) - for numElmtsToRemove < originalLen { + for numElmtsToRemove < len(ad.recentConnectionTimes) { timeSince = curTime.Sub(ad.recentConnectionTimes[numElmtsToRemove]) if timeSince >= ps.connectionsRateLimitingWindow { numElmtsToRemove++ @@ -154,12 +153,16 @@ func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Dura ps.popNElements(numElmtsToRemove, peerID) // If there are max number of connections within the time window, wait - remainingLength := originalLen - numElmtsToRemove - if uint(remainingLength) >= ps.connectionsRateLimitingCount { + metadata, _ = ps.Get(peerID, addressDataKey) + ad, ok = metadata.(addressData) + if !ok { + return false, 0 /* not used */, curTime /* not used */ + } + numElts := len(ad.recentConnectionTimes) + if uint(numElts) >= ps.connectionsRateLimitingCount { return true, /* true */ ps.connectionsRateLimitingWindow - timeSince, curTime /* not used */ } - // Else, there is space in connectionsRateLimitingCount. The // connection request of the caller will proceed // Update curTime, since it may have significantly changed if waited @@ -302,6 +305,20 @@ func makePhonebookEntryData(networkName string, role phonebook.Role, persistent return pbData } +func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) { + data, err := ps.Get(peerID, addressDataKey) + if err != nil { + return + } + ad := data.(addressData) + delete(ad.networkNames, networkName) + isEmpty := len(ad.networkNames) == 0 + if isEmpty { + ps.ClearAddrs(peerID) + _ = ps.Put(peerID, addressDataKey, nil) + } +} + // appendTime adds the current time to recentConnectionTimes in // addressData of addr func (ps *PeerStore) appendTime(peerID peer.ID, t time.Time) { @@ -321,20 +338,6 @@ func (ps *PeerStore) popNElements(n int, peerID peer.ID) { _ = ps.Put(peerID, addressDataKey, ad) } -func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) { - data, err := ps.Get(peerID, addressDataKey) - if err != nil { - return - } - ad := data.(addressData) - delete(ad.networkNames, networkName) - isEmpty := len(ad.networkNames) == 0 - if isEmpty { - ps.ClearAddrs(peerID) - _ = ps.Put(peerID, addressDataKey, nil) - } -} - func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.Role) []*peer.AddrInfo { o := make([]*peer.AddrInfo, 0, len(ps.Peers())) for _, peerID := range ps.Peers() {