Skip to content

Commit 0f0caf9

Browse files
authored
[core] Added atomic support and marked atomic key fields detected by thread sanitizer (#1863)
1 parent e37f4ab commit 0f0caf9

11 files changed

+677
-112
lines changed

srtcore/api.cpp

+32-13
Original file line numberDiff line numberDiff line change
@@ -2641,13 +2641,17 @@ void srt::CUDTUnited::checkBrokenSockets()
26412641
// NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
26422642
// this function is called (isRcvDataReady also checks if the
26432643
// available data is "ready to play").
2644-
&& s->m_pUDT->m_pRcvBuffer->isRcvDataAvailable()
2645-
&& (s->m_pUDT->m_iBrokenCounter -- > 0))
2644+
&& s->m_pUDT->m_pRcvBuffer->isRcvDataAvailable())
26462645
{
2647-
// HLOGF(smlog.Debug, "STILL KEEPING socket (still have data):
2648-
// %d\n", i->first);
2649-
// if there is still data in the receiver buffer, wait longer
2650-
continue;
2646+
const int bc = s->m_pUDT->m_iBrokenCounter.load();
2647+
if (bc > 0)
2648+
{
2649+
// HLOGF(smlog.Debug, "STILL KEEPING socket (still have data):
2650+
// %d\n", i->first);
2651+
// if there is still data in the receiver buffer, wait longer
2652+
s->m_pUDT->m_iBrokenCounter.store(bc - 1);
2653+
continue;
2654+
}
26512655
}
26522656

26532657
#if ENABLE_EXPERIMENTAL_BONDING
@@ -2702,15 +2706,17 @@ void srt::CUDTUnited::checkBrokenSockets()
27022706
// RcvUList
27032707
const steady_clock::time_point now = steady_clock::now();
27042708
const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp;
2705-
if ((closed_ago > seconds_from(1))
2706-
&& ((!j->second->m_pUDT->m_pRNode)
2707-
|| !j->second->m_pUDT->m_pRNode->m_bOnList))
2709+
if (closed_ago > seconds_from(1))
27082710
{
2709-
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
2710-
<< FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove");
2711+
CRNode* rnode = j->second->m_pUDT->m_pRNode;
2712+
if (!rnode || !rnode->m_bOnList)
2713+
{
2714+
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
2715+
<< FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove");
27112716

2712-
// HLOGF(smlog.Debug, "will unref socket: %d\n", j->first);
2713-
tbr.push_back(j->first);
2717+
// HLOGF(smlog.Debug, "will unref socket: %d\n", j->first);
2718+
tbr.push_back(j->first);
2719+
}
27142720
}
27152721
}
27162722

@@ -2734,6 +2740,19 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
27342740

27352741
CUDTSocket* const s = i->second;
27362742

2743+
// The socket may be in the trashcan now, but could
2744+
// still be under processing in the sender/receiver worker
2745+
// threads. If that's the case, SKIP IT THIS TIME. The
2746+
// socket will be checked next time the GC rollover starts.
2747+
CSNode* sn = s->m_pUDT->m_pSNode;
2748+
if (sn && sn->m_iHeapLoc != -1)
2749+
return;
2750+
2751+
CRNode* rn = s->m_pUDT->m_pRNode;
2752+
if (rn && rn->m_bOnList)
2753+
return;
2754+
2755+
27372756
#if ENABLE_EXPERIMENTAL_BONDING
27382757
if (s->m_GroupOf)
27392758
{

srtcore/api.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class CUDTSocket
102102

103103
void construct();
104104

105-
SRT_SOCKSTATUS m_Status; //< current socket state
105+
srt::sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state
106106

107107
/// Time when the socket is closed.
108108
/// When the socket is closed, it is not removed immediately from the list
@@ -421,7 +421,7 @@ friend class CRendezvousQueue;
421421
CCache<CInfoBlock>* m_pCache; // UDT network information cache
422422

423423
private:
424-
volatile bool m_bClosing;
424+
srt::sync::atomic<bool> m_bClosing;
425425
sync::Mutex m_GCStopLock;
426426
sync::Condition m_GCStopCond;
427427

srtcore/atomic.h

+210
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
//----------------------------------------------------------------------------
2+
// This is free and unencumbered software released into the public domain.
3+
//
4+
// Anyone is free to copy, modify, publish, use, compile, sell, or distribute
5+
// this software, either in source code form or as a compiled binary, for any
6+
// purpose, commercial or non-commercial, and by any means.
7+
//
8+
// In jurisdictions that recognize copyright laws, the author or authors of
9+
// this software dedicate any and all copyright interest in the software to the
10+
// public domain. We make this dedication for the benefit of the public at
11+
// large and to the detriment of our heirs and successors. We intend this
12+
// dedication to be an overt act of relinquishment in perpetuity of all present
13+
// and future rights to this software under copyright law.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19+
// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20+
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
//
22+
// For more information, please refer to <http://unlicense.org/>
23+
//-----------------------------------------------------------------------------
24+
25+
// SRT Project information:
26+
// This file was adopted from a Public Domain project from
27+
// https://github.com/mbitsnbites/atomic
28+
// Only namespaces were changed to adopt it for SRT project.
29+
30+
#ifndef SRT_SYNC_ATOMIC_H_
31+
#define SRT_SYNC_ATOMIC_H_
32+
33+
// Macro for disallowing copying of an object.
34+
#if __cplusplus >= 201103L
35+
#define ATOMIC_DISALLOW_COPY(T) \
36+
T(const T&) = delete; \
37+
T& operator=(const T&) = delete;
38+
#else
39+
#define ATOMIC_DISALLOW_COPY(T) \
40+
T(const T&); \
41+
T& operator=(const T&);
42+
#endif
43+
44+
// A portable static assert.
45+
#if __cplusplus >= 201103L
46+
#define ATOMIC_STATIC_ASSERT(condition, message) \
47+
static_assert((condition), message)
48+
#else
49+
// Based on: http://stackoverflow.com/a/809465/5778708
50+
#define ATOMIC_STATIC_ASSERT(condition, message) \
51+
_impl_STATIC_ASSERT_LINE(condition, __LINE__)
52+
#define _impl_PASTE(a, b) a##b
53+
#ifdef __GNUC__
54+
#define _impl_UNUSED __attribute__((__unused__))
55+
#else
56+
#define _impl_UNUSED
57+
#endif
58+
#define _impl_STATIC_ASSERT_LINE(condition, line) \
59+
typedef char _impl_PASTE( \
60+
STATIC_ASSERT_failed_, \
61+
line)[(2 * static_cast<int>(!!(condition))) - 1] _impl_UNUSED
62+
#endif
63+
64+
#if defined(__GNUC__) || defined(__clang__) || defined(__xlc__)
65+
#define ATOMIC_USE_GCC_INTRINSICS
66+
#elif defined(_MSC_VER)
67+
#define ATOMIC_USE_MSVC_INTRINSICS
68+
#include "atomic_msvc.h"
69+
#elif __cplusplus >= 201103L
70+
#define ATOMIC_USE_CPP11_ATOMIC
71+
#include <atomic>
72+
#else
73+
#error Unsupported compiler / system.
74+
#endif
75+
76+
namespace srt {
77+
namespace sync {
78+
template <typename T>
79+
class atomic {
80+
public:
81+
ATOMIC_STATIC_ASSERT(sizeof(T) == 1 || sizeof(T) == 2 || sizeof(T) == 4 ||
82+
sizeof(T) == 8,
83+
"Only types of size 1, 2, 4 or 8 are supported");
84+
85+
atomic() : value_(static_cast<T>(0)) {}
86+
87+
explicit atomic(const T value) : value_(value) {}
88+
89+
/// @brief Performs an atomic increment operation (value + 1).
90+
/// @returns The new value of the atomic object.
91+
T operator++() {
92+
#if defined(ATOMIC_USE_GCC_INTRINSICS)
93+
return __atomic_add_fetch(&value_, 1, __ATOMIC_SEQ_CST);
94+
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
95+
return msvc::interlocked<T>::increment(&value_);
96+
#else
97+
return ++value_;
98+
#endif
99+
}
100+
101+
/// @brief Performs an atomic decrement operation (value - 1).
102+
/// @returns The new value of the atomic object.
103+
T operator--() {
104+
#if defined(ATOMIC_USE_GCC_INTRINSICS)
105+
return __atomic_sub_fetch(&value_, 1, __ATOMIC_SEQ_CST);
106+
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
107+
return msvc::interlocked<T>::decrement(&value_);
108+
#else
109+
return --value_;
110+
#endif
111+
}
112+
113+
/// @brief Performs an atomic compare-and-swap (CAS) operation.
114+
///
115+
/// The value of the atomic object is only updated to the new value if the
116+
/// old value of the atomic object matches @c expected_val.
117+
///
118+
/// @param expected_val The expected value of the atomic object.
119+
/// @param new_val The new value to write to the atomic object.
120+
/// @returns True if new_value was written to the atomic object.
121+
bool compare_exchange(const T expected_val, const T new_val) {
122+
#if defined(ATOMIC_USE_GCC_INTRINSICS)
123+
T e = expected_val;
124+
return __atomic_compare_exchange_n(
125+
&value_, &e, new_val, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
126+
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
127+
const T old_val =
128+
msvc::interlocked<T>::compare_exchange(&value_, new_val, expected_val);
129+
return (old_val == expected_val);
130+
#else
131+
T e = expected_val;
132+
return value_.compare_exchange_weak(e, new_val);
133+
#endif
134+
}
135+
136+
/// @brief Performs an atomic set operation.
137+
///
138+
/// The value of the atomic object is unconditionally updated to the new
139+
/// value.
140+
///
141+
/// @param new_val The new value to write to the atomic object.
142+
void store(const T new_val) {
143+
#if defined(ATOMIC_USE_GCC_INTRINSICS)
144+
__atomic_store_n(&value_, new_val, __ATOMIC_SEQ_CST);
145+
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
146+
(void)msvc::interlocked<T>::exchange(&value_, new_val);
147+
#else
148+
value_.store(new_val);
149+
#endif
150+
}
151+
152+
/// @returns the current value of the atomic object.
153+
/// @note Be careful about how this is used, since any operations on the
154+
/// returned value are inherently non-atomic.
155+
T load() const {
156+
#if defined(ATOMIC_USE_GCC_INTRINSICS)
157+
return __atomic_load_n(&value_, __ATOMIC_SEQ_CST);
158+
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
159+
// TODO(m): Is there a better solution for MSVC?
160+
return value_;
161+
#else
162+
return value_;
163+
#endif
164+
}
165+
166+
/// @brief Performs an atomic exchange operation.
167+
///
168+
/// The value of the atomic object is unconditionally updated to the new
169+
/// value, and the old value is returned.
170+
///
171+
/// @param new_val The new value to write to the atomic object.
172+
/// @returns the old value.
173+
T exchange(const T new_val) {
174+
#if defined(ATOMIC_USE_GCC_INTRINSICS)
175+
return __atomic_exchange_n(&value_, new_val, __ATOMIC_SEQ_CST);
176+
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
177+
return msvc::interlocked<T>::exchange(&value_, new_val);
178+
#else
179+
return value_.exchange(new_val);
180+
#endif
181+
}
182+
183+
T operator=(const T new_value) {
184+
store(new_value);
185+
return new_value;
186+
}
187+
188+
operator T() const {
189+
return load();
190+
}
191+
192+
private:
193+
#if defined(ATOMIC_USE_GCC_INTRINSICS) || defined(ATOMIC_USE_MSVC_INTRINSICS)
194+
volatile T value_;
195+
#else
196+
std::atomic<T> value_;
197+
#endif
198+
199+
ATOMIC_DISALLOW_COPY(atomic)
200+
};
201+
202+
} // namespace sync
203+
} // namespace srt
204+
205+
// Undef temporary defines.
206+
#undef ATOMIC_USE_GCC_INTRINSICS
207+
#undef ATOMIC_USE_MSVC_INTRINSICS
208+
#undef ATOMIC_USE_CPP11_ATOMIC
209+
210+
#endif // ATOMIC_ATOMIC_H_

0 commit comments

Comments
 (0)