Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/fix turn unit test memory leak #626

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions turn/src/allocation/allocation_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct ManagerConfig {

/// `Manager` is used to hold active allocations.
pub struct Manager {
allocations: AllocationMap,
allocations: Arc<Mutex<AllocationMap>>,
reservations: Arc<Mutex<HashMap<String, u16>>>,
relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
Expand Down Expand Up @@ -107,9 +107,9 @@ impl Manager {
relay_addr,
five_tuple,
username,
Arc::downgrade(&self.allocations),
self.alloc_close_notify.clone(),
);
a.allocations = Some(Arc::clone(&self.allocations));

log::debug!("listening on relay addr: {:?}", a.relay_addr);
a.start(lifetime).await;
Expand Down
18 changes: 18 additions & 0 deletions turn/src/allocation/allocation_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ async fn test_has_permission() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -50,12 +52,14 @@ async fn test_add_permission() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand All @@ -74,12 +78,14 @@ async fn test_remove_permission() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -107,12 +113,14 @@ async fn test_add_channel_bind() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -141,12 +149,14 @@ async fn test_get_channel_by_number() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -177,12 +187,14 @@ async fn test_get_channel_by_addr() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand All @@ -209,12 +221,14 @@ async fn test_remove_channel_bind() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down Expand Up @@ -246,12 +260,14 @@ async fn test_allocation_refresh() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand All @@ -268,12 +284,14 @@ async fn test_allocation_close() -> Result<()> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down
6 changes: 3 additions & 3 deletions turn/src/allocation/channel_bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod channel_bind_test;

use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use portable_atomic::AtomicBool;
use tokio::sync::Mutex;
Expand All @@ -18,7 +18,7 @@ use crate::proto::channum::*;
pub struct ChannelBind {
pub(crate) peer: SocketAddr,
pub(crate) number: ChannelNumber,
pub(crate) channel_bindings: Option<Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>>,
pub(crate) channel_bindings: Option<Weak<Mutex<HashMap<ChannelNumber, ChannelBind>>>>,
reset_tx: Option<mpsc::Sender<Duration>>,
timer_expired: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -51,7 +51,7 @@ impl ChannelBind {
while !done {
tokio::select! {
_ = &mut timer => {
if let Some(cbs) = &channel_bindings{
if let Some(cbs) = &channel_bindings.clone().and_then(|x| x.upgrade()) {
let mut cb = cbs.lock().await;
if cb.remove(&number).is_none() {
log::error!("Failed to remove ChannelBind for {}", number);
Expand Down
2 changes: 2 additions & 0 deletions turn/src/allocation/channel_bind/channel_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ async fn create_channel_bind(lifetime: Duration) -> Result<Allocation> {
let turn_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let relay_socket = Arc::clone(&turn_socket);
let relay_addr = relay_socket.local_addr()?;
let allocations = Arc::new(Mutex::new(AllocationMap::new()));
let a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
Arc::downgrade(&allocations),
None,
);

Expand Down
17 changes: 9 additions & 8 deletions turn/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use channel_bind::*;
use five_tuple::*;
Expand All @@ -34,7 +34,7 @@ use crate::proto::*;

const RTP_MTU: usize = 1500;

pub type AllocationMap = Arc<Mutex<HashMap<FiveTuple, Arc<Allocation>>>>;
pub type AllocationMap = HashMap<FiveTuple, Arc<Allocation>>;

/// Information about an [`Allocation`].
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct Allocation {
username: Username,
permissions: Arc<Mutex<HashMap<String, Permission>>>,
channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
pub(crate) allocations: Option<AllocationMap>,
allocations: Weak<Mutex<AllocationMap>>,
reset_tx: SyncMutex<Option<mpsc::Sender<Duration>>>,
timer_expired: Arc<AtomicBool>,
closed: AtomicBool, // Option<mpsc::Receiver<()>>,
Expand All @@ -98,6 +98,7 @@ impl Allocation {
relay_addr: SocketAddr,
five_tuple: FiveTuple,
username: Username,
allocation_map: Weak<Mutex<AllocationMap>>,
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
) -> Self {
Allocation {
Expand All @@ -109,7 +110,7 @@ impl Allocation {
username,
permissions: Arc::new(Mutex::new(HashMap::new())),
channel_bindings: Arc::new(Mutex::new(HashMap::new())),
allocations: None,
allocations: allocation_map,
reset_tx: SyncMutex::new(None),
timer_expired: Arc::new(AtomicBool::new(false)),
closed: AtomicBool::new(false),
Expand Down Expand Up @@ -137,7 +138,7 @@ impl Allocation {
}
}

p.permissions = Some(Arc::clone(&self.permissions));
p.permissions = Some(Arc::downgrade(&self.permissions));
p.start(PERMISSION_TIMEOUT).await;

{
Expand Down Expand Up @@ -184,7 +185,7 @@ impl Allocation {
let peer = c.peer;

// Add or refresh this channel.
c.channel_bindings = Some(Arc::clone(&self.channel_bindings));
c.channel_bindings = Some(Arc::downgrade(&self.channel_bindings));
c.start(lifetime).await;

{
Expand Down Expand Up @@ -279,7 +280,7 @@ impl Allocation {
while !done {
tokio::select! {
_ = &mut timer => {
if let Some(allocs) = &allocations{
if let Some(allocs) = &allocations.upgrade(){
let mut allocs = allocs.lock().await;
if let Some(a) = allocs.remove(&five_tuple) {
let _ = a.close().await;
Expand Down Expand Up @@ -355,7 +356,7 @@ impl Allocation {
match result {
Ok((n, src_addr)) => (n, src_addr),
Err(_) => {
if let Some(allocs) = &allocations {
if let Some(allocs) = &allocations.upgrade() {
let mut allocs = allocs.lock().await;
allocs.remove(&five_tuple);
}
Expand Down
6 changes: 3 additions & 3 deletions turn/src/allocation/permission.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use portable_atomic::AtomicBool;
use tokio::sync::Mutex;
Expand All @@ -15,7 +15,7 @@ pub(crate) const PERMISSION_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// https://tools.ietf.org/html/rfc5766#section-2.3
pub struct Permission {
pub(crate) addr: SocketAddr,
pub(crate) permissions: Option<Arc<Mutex<HashMap<String, Permission>>>>,
pub(crate) permissions: Option<Weak<Mutex<HashMap<String, Permission>>>>,
reset_tx: Option<mpsc::Sender<Duration>>,
timer_expired: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Permission {
while !done {
tokio::select! {
_ = &mut timer => {
if let Some(perms) = &permissions{
if let Some(perms) = &permissions.clone().and_then(|x| x.upgrade()) {
let mut p = perms.lock().await;
p.remove(&addr2ipfingerprint(&addr));
}
Expand Down
Loading