Skip to content

Commit

Permalink
Bugfix/fix turn unit test memory leak (#626)
Browse files Browse the repository at this point in the history
* Fix cyclic dependency in ChannelBind/Permission

- Both structs have a timer-callback like mechanism to remove itself from the table which held by Allocation. In this case, cyclic dependency is created when the struct holds Arc<> pointing to the table owning it. We replace the Arc<> in the callback with Weak<>

* Fix cyclic dependency in Allocation

- Allocation hold Arc<> to the table of Allocations owned by AllocationManager and thus creates the cyclic dependency. We replace it with Weak<> when pointing to parent
  • Loading branch information
mutexd authored Nov 3, 2024
1 parent 2ec027f commit 03237d0
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 16 deletions.
4 changes: 2 additions & 2 deletions turn/src/allocation/allocation_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct ManagerConfig {

/// `Manager` is used to hold active allocations.
pub struct Manager {
allocations: AllocationMap,
allocations: Arc<Mutex<AllocationMap>>,
reservations: Arc<Mutex<HashMap<String, u16>>>,
relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
Expand Down Expand Up @@ -107,9 +107,9 @@ impl Manager {
relay_addr,
five_tuple,
username,
Arc::downgrade(&self.allocations),
self.alloc_close_notify.clone(),
);
a.allocations = Some(Arc::clone(&self.allocations));

log::debug!("listening on relay addr: {:?}", a.relay_addr);
a.start(lifetime).await;
Expand Down
18 changes: 18 additions & 0 deletions turn/src/allocation/allocation_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ async fn test_has_permission() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -50,12 +52,14 @@ async fn test_add_permission() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand All @@ -74,12 +78,14 @@ async fn test_remove_permission() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -107,12 +113,14 @@ async fn test_add_channel_bind() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -141,12 +149,14 @@ async fn test_get_channel_by_number() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -177,12 +187,14 @@ async fn test_get_channel_by_addr() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand All @@ -209,12 +221,14 @@ async fn test_remove_channel_bind() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -246,12 +260,14 @@ async fn test_allocation_refresh() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand All @@ -268,12 +284,14 @@ async fn test_allocation_close() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down
6 changes: 3 additions & 3 deletions turn/src/allocation/channel_bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod channel_bind_test;

use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use portable_atomic::AtomicBool;
use tokio::sync::Mutex;
Expand All @@ -18,7 +18,7 @@ use crate::proto::channum::*;
pub struct ChannelBind {
pub(crate) peer: SocketAddr,
pub(crate) number: ChannelNumber,
pub(crate) channel_bindings: Option<Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>>,
pub(crate) channel_bindings: Option<Weak<Mutex<HashMap<ChannelNumber, ChannelBind>>>>,
reset_tx: Option<mpsc::Sender<Duration>>,
timer_expired: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -51,7 +51,7 @@ impl ChannelBind {
while !done {
tokio::select! {
_ = &mut timer => {
if let Some(cbs) = &channel_bindings{
if let Some(cbs) = &channel_bindings.clone().and_then(|x| x.upgrade()) {
let mut cb = cbs.lock().await;
if cb.remove(&number).is_none() {
log::error!("Failed to remove ChannelBind for {}", number);
Expand Down
2 changes: 2 additions & 0 deletions turn/src/allocation/channel_bind/channel_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ async fn create_channel_bind(lifetime: Duration) -> Result<Allocation> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down
17 changes: 9 additions & 8 deletions turn/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use channel_bind::*;
use five_tuple::*;
Expand All @@ -34,7 +34,7 @@ use crate::proto::*;

const RTP_MTU: usize = 1500;

pub type AllocationMap = Arc<Mutex<HashMap<FiveTuple, Arc<Allocation>>>>;
pub type AllocationMap = HashMap<FiveTuple, Arc<Allocation>>;

/// Information about an [`Allocation`].
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct Allocation {
username: Username,
permissions: Arc<Mutex<HashMap<String, Permission>>>,
channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
pub(crate) allocations: Option<AllocationMap>,
allocations: Weak<Mutex<AllocationMap>>,
reset_tx: SyncMutex<Option<mpsc::Sender<Duration>>>,
timer_expired: Arc<AtomicBool>,
closed: AtomicBool, // Option<mpsc::Receiver<()>>,
Expand All @@ -98,6 +98,7 @@ impl Allocation {
relay_addr: SocketAddr,
five_tuple: FiveTuple,
username: Username,
allocation_map: Weak<Mutex<AllocationMap>>,
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
) -> Self {
Allocation {
Expand All @@ -109,7 +110,7 @@ impl Allocation {
username,
permissions: Arc::new(Mutex::new(HashMap::new())),
channel_bindings: Arc::new(Mutex::new(HashMap::new())),
allocations: None,
allocations: allocation_map,
reset_tx: SyncMutex::new(None),
timer_expired: Arc::new(AtomicBool::new(false)),
closed: AtomicBool::new(false),
Expand Down Expand Up @@ -137,7 +138,7 @@ impl Allocation {
}
}

p.permissions = Some(Arc::clone(&self.permissions));
p.permissions = Some(Arc::downgrade(&self.permissions));
p.start(PERMISSION_TIMEOUT).await;

{
Expand Down Expand Up @@ -184,7 +185,7 @@ impl Allocation {
let peer = c.peer;

// Add or refresh this channel.
c.channel_bindings = Some(Arc::clone(&self.channel_bindings));
c.channel_bindings = Some(Arc::downgrade(&self.channel_bindings));
c.start(lifetime).await;

{
Expand Down Expand Up @@ -279,7 +280,7 @@ impl Allocation {
while !done {
tokio::select! {
_ = &mut timer => {
if let Some(allocs) = &allocations{
if let Some(allocs) = &allocations.upgrade(){
let mut allocs = allocs.lock().await;
if let Some(a) = allocs.remove(&five_tuple) {
let _ = a.close().await;
Expand Down Expand Up @@ -355,7 +356,7 @@ impl Allocation {
match result {
Ok((n, src_addr)) => (n, src_addr),
Err(_) => {
if let Some(allocs) = &allocations {
if let Some(allocs) = &allocations.upgrade() {
let mut allocs = allocs.lock().await;
allocs.remove(&five_tuple);
}
Expand Down
6 changes: 3 additions & 3 deletions turn/src/allocation/permission.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use portable_atomic::AtomicBool;
use tokio::sync::Mutex;
Expand All @@ -15,7 +15,7 @@ pub(crate) const PERMISSION_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// https://tools.ietf.org/html/rfc5766#section-2.3
pub struct Permission {
pub(crate) addr: SocketAddr,
pub(crate) permissions: Option<Arc<Mutex<HashMap<String, Permission>>>>,
pub(crate) permissions: Option<Weak<Mutex<HashMap<String, Permission>>>>,
reset_tx: Option<mpsc::Sender<Duration>>,
timer_expired: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Permission {
while !done {
tokio::select! {
_ = &mut timer => {
if let Some(perms) = &permissions{
if let Some(perms) = &permissions.clone().and_then(|x| x.upgrade()) {
let mut p = perms.lock().await;
p.remove(&addr2ipfingerprint(&addr));
}
Expand Down

0 comments on commit 03237d0

Please sign in to comment.