Skip to content

Commit 922f8fc

Browse files
committed
[core] drop packets in the new recv buffer by group recv base
1 parent 3d26644 commit 922f8fc

File tree

3 files changed

+49
-15
lines changed

3 files changed

+49
-15
lines changed

srtcore/buffer_rcv.cpp

+8-15
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,10 @@ int CRcvBufferNew::insert(CUnit* unit)
161161

162162
int CRcvBufferNew::dropUpTo(int32_t seqno)
163163
{
164-
// Can drop only when nothing to read, and
165-
// first unacknowledged packet is missing.
166-
SRT_ASSERT(m_iStartPos == m_iFirstNonreadPos);
167-
168164
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
169165
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);
170166

171167
int len = CSeqNo::seqoff(m_iStartSeqNo, seqno);
172-
SRT_ASSERT(len > 0);
173168
if (len <= 0)
174169
{
175170
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
@@ -180,34 +175,32 @@ int CRcvBufferNew::dropUpTo(int32_t seqno)
180175
if (m_iMaxPosInc < 0)
181176
m_iMaxPosInc = 0;
182177

183-
// Check that all packets being dropped are missing.
184178
const int iDropCnt = len;
185179
while (len > 0)
186180
{
187-
if (m_entries[m_iStartPos].pUnit != NULL)
181+
CUnit* pUnit = m_entries[m_iStartPos].pUnit;
182+
if (pUnit != NULL)
188183
{
184+
if (!m_tsbpd.isEnabled() && m_bMessageAPI && !pUnit->m_Packet.getMsgOrderFlag())
185+
--m_numOutOfOrderPackets;
189186
releaseUnitInPos(m_iStartPos);
190187
}
191-
192-
if (m_entries[m_iStartPos].status != EntryState_Empty)
193-
{
194-
SRT_ASSERT(m_entries[m_iStartPos].status == EntryState_Drop || m_entries[m_iStartPos].status == EntryState_Read);
195-
m_entries[m_iStartPos].status = EntryState_Empty;
196-
}
197-
188+
m_entries[m_iStartPos].status = EntryState_Empty;
198189
SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty);
199190
m_iStartPos = incPos(m_iStartPos);
200191
--len;
201192
}
202193

203194
// Update positions
204195
m_iStartSeqNo = seqno;
205-
// Move forward if there are "read" entries.
196+
// Move forward if there are "read/drop" entries.
206197
releaseNextFillerEntries();
207198
// Set nonread position to the starting position before updating,
208199
// because start position was increased, and preceeding packets are invalid.
209200
m_iFirstNonreadPos = m_iStartPos;
210201
updateNonreadPos();
202+
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
203+
updateFirstReadableOutOfOrder();
211204
return iDropCnt;
212205
}
213206

srtcore/core.cpp

+34
Original file line numberDiff line numberDiff line change
@@ -7706,6 +7706,36 @@ int32_t srt::CUDT::ackDataUpTo(int32_t ack)
77067706
#endif
77077707
}
77087708

7709+
#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER
7710+
void srt::CUDT::dropToGroupRecvBase() {
7711+
int32_t group_recv_base = SRT_SEQNO_NONE;
7712+
if (m_parent->m_GroupOf)
7713+
{
7714+
// Check is first done before locking to avoid unnecessary
7715+
// mutex locking. The condition for this field is that it
7716+
// can be either never set, already reset, or ever set
7717+
// and possibly dangling. The re-check after lock eliminates
7718+
// the dangling case.
7719+
ScopedLock glock (uglobal().m_GlobControlLock);
7720+
7721+
// Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock,
7722+
// but this is an intended order.
7723+
if (m_parent->m_GroupOf)
7724+
group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo();
7725+
}
7726+
if (group_recv_base == SRT_SEQNO_NONE)
7727+
return;
7728+
7729+
ScopedLock lck(m_RcvBufferLock);
7730+
dropTooLateUpTo(CSeqNo::incseq(group_recv_base));
7731+
if (CSeqNo::seqcmp(group_recv_base, m_iRcvCurrSeqNo) > 0)
7732+
{
7733+
LOGC(xtlog.Note, log << "dropToGroupRecvBase: m_iRcvCurrSeqNo: %" << m_iRcvCurrSeqNo << " -> %" << group_recv_base );
7734+
m_iRcvCurrSeqNo = group_recv_base;
7735+
}
7736+
}
7737+
#endif
7738+
77097739
namespace srt {
77107740
#if ENABLE_HEAVY_LOGGING
77117741
static void DebugAck(string hdr, int prev, int ack)
@@ -7901,6 +7931,10 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
79017931
string reason = "first lost"; // just for "a reason" of giving particular % for ACK
79027932
#endif
79037933

7934+
#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER
7935+
dropToGroupRecvBase();
7936+
#endif
7937+
79047938
{
79057939
// If there is no loss, the ACK is the current largest sequence number plus 1;
79067940
// Otherwise it is the smallest sequence number in the receiver loss list.

srtcore/core.h

+7
Original file line numberDiff line numberDiff line change
@@ -1060,6 +1060,13 @@ class CUDT
10601060
/// @return
10611061
int32_t ackDataUpTo(int32_t seq);
10621062

1063+
#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER
1064+
/// @brief Drop packets in the recv buffer behind group_recv_base.
1065+
/// Updates m_iRcvLastSkipAck if it's behind group_recv_base.
1066+
/// Updates m_iRcvCurrSeqNo if it's behind group_recv_base.
1067+
void dropToGroupRecvBase();
1068+
#endif
1069+
10631070
void handleKeepalive(const char* data, size_t lenghth);
10641071

10651072
/// Locks m_RcvBufferLock and retrieves the available size of the receiver buffer.

0 commit comments

Comments
 (0)