Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] refactor Group::recv() base on new rcv buffer to support message mode #2218

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo);

const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder;
// Remember if we actually read out of order packet.
const bool readingOutOfOrderPacket = !canReadInOrder || m_iStartPos == m_iFirstReadableOutOfOrder;

size_t remain = len;
char* dst = data;
Expand Down Expand Up @@ -313,13 +311,14 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
const bool pbLast = packet.getMsgBoundary() & PB_LAST;
if (msgctrl && (packet.getMsgBoundary() & PB_FIRST))
{
msgctrl->pktseq = pktseqno;
msgctrl->msgno = packet.getMsgSeq(m_bPeerRexmitFlag);
}
if (msgctrl && pbLast)
{
msgctrl->srctime = count_microseconds(getPktTsbPdTime(packet.getMsgTimeStamp()).time_since_epoch());
}
if (msgctrl)
msgctrl->pktseq = pktseqno;

releaseUnitInPos(i);
if (updateStartPos)
Expand All @@ -344,8 +343,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
}

countBytes(-pkts_read, -bytes_extracted);
if (!m_tsbpd.isEnabled() && readingOutOfOrderPacket)
updateFirstReadableOutOfOrder();

releaseNextFillerEntries();

Expand All @@ -355,6 +352,11 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
//updateNonreadPos();
}

if (!m_tsbpd.isEnabled())
// We need updateFirstReadableOutOfOrder() here even if we are reading inorder,
// incase readable inorder packets are all read out.
updateFirstReadableOutOfOrder();

const int bytes_read = dst - data;
if (bytes_read < bytes_extracted)
{
Expand Down Expand Up @@ -588,6 +590,41 @@ bool CRcvBufferNew::isRcvDataReady(time_point time_now) const
return info.tsbpd_time <= time_now;
}

CRcvBufferNew::PacketInfo CRcvBufferNew::getFirstReadablePacketInfo(time_point time_now) const
{
const PacketInfo unreadableInfo = {SRT_SEQNO_NONE, false, time_point()};
const bool hasInorderPackets = hasReadableInorderPkts();

if (!m_tsbpd.isEnabled())
{
if (hasInorderPackets)
{
const CPacket& packet = m_entries[m_iStartPos].pUnit->m_Packet;
const PacketInfo info = {packet.getSeqNo(), false, time_point()};
return info;
}
SRT_ASSERT((!m_bMessageAPI && m_numOutOfOrderPackets == 0) || m_bMessageAPI);
if (m_iFirstReadableOutOfOrder >= 0)
{
SRT_ASSERT(m_numOutOfOrderPackets > 0);
const CPacket& packet = m_entries[m_iFirstReadableOutOfOrder].pUnit->m_Packet;
const PacketInfo info = {packet.getSeqNo(), true, time_point()};
return info;
}
return unreadableInfo;
}

if (!hasInorderPackets)
return unreadableInfo;

const PacketInfo info = getFirstValidPacketInfo();

if (info.tsbpd_time <= time_now)
return info;
else
return unreadableInfo;
}

void CRcvBufferNew::countBytes(int pkts, int bytes)
{
ScopedLock lock(m_BytesCountLock);
Expand Down
2 changes: 2 additions & 0 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class CRcvBufferNew
/// IF skipseqno == -1, no missing packet but 1st not ready to play.
PacketInfo getFirstValidPacketInfo() const;

PacketInfo getFirstReadablePacketInfo(time_point time_now) const;

/// Get information on packets available to be read.
/// @returns a pair of sequence numbers (first available; first unavailable).
///
Expand Down
188 changes: 188 additions & 0 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,193 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno)
return false;
}

#ifdef ENABLE_NEW_RCVBUFFER
int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
{
// First, acquire GlobControlLock to make sure all member sockets still exist
enterCS(m_Global.m_GlobControlLock);
ScopedLock guard(m_GroupLock);

if (m_bClosing)
{
// The group could be set closing in the meantime, but if
// this is only about to be set by another thread, this thread
// must fist wait for being able to acquire this lock.
// The group will not be deleted now because it is added usage counter
// by this call, but will be released once it exits.
leaveCS(m_Global.m_GlobControlLock);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}

// Now, still under lock, check if all sockets still can be dispatched
send_CheckValidSockets();
leaveCS(m_Global.m_GlobControlLock);

if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

// Later iteration over it might be less efficient than
// by vector, but we'll also often try to check a single id
// if it was ever seen broken, so that it's skipped.
set<CUDTSocket*> broken;

for (;;)
{
if (!m_bOpened || !m_bConnected)
{
LOGC(grlog.Error,
log << boolalpha << "grp/recv: $" << id() << ": ABANDONING: opened=" << m_bOpened
<< " connected=" << m_bConnected);
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}

vector<CUDTSocket*> aliveMembers;
recv_CollectAliveAndBroken(aliveMembers, broken);
if (aliveMembers.empty())
{
LOGC(grlog.Error, log << "grp/recv: ALL LINKS BROKEN, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}

vector<CUDTSocket*> readySockets;
if (m_bSynRecving)
readySockets = recv_WaitForReadReady(aliveMembers, broken);
else
readySockets = aliveMembers;

if (m_bClosing)
{
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": GROUP CLOSED, ABANDONING.");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}

// Find the first readable packet among all member sockets.
CUDTSocket* socketToRead = NULL;
CRcvBufferNew::PacketInfo infoToRead = {-1, false, time_point()};
for (vector<CUDTSocket*>::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si)
{
CUDTSocket* ps = *si;

ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
// Drop here to make sure the getFirstReadablePacketInfo() below return fresher packet.
int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets before reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}

const CRcvBufferNew::PacketInfo info =
ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now());
if (info.seqno == SRT_SEQNO_NONE)
{
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read.");
continue;
}
// We need to qualify the sequence, just for a case.
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, info.seqno))
{
LOGC(grlog.Error,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": SEQUENCE DISCREPANCY: base=%"
<< m_RcvBaseSeqNo << " vs pkt=%" << info.seqno << ", setting ESECFAIL");
ps->core().m_bBroken = true;
broken.insert(ps);
continue;
}
if (socketToRead == NULL || CSeqNo::seqcmp(info.seqno, infoToRead.seqno) < 0)
{
socketToRead = ps;
infoToRead = info;
}
}

if (socketToRead == NULL)
{
if (m_bSynRecving)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": No links reported any fresher packet, re-polling.");
continue;
}
else
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": No links reported any fresher packet, clearing readiness.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
}
else
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": Found first readable packet from @" << socketToRead->m_SocketID
<< ": seq=" << infoToRead.seqno << " gap=" << infoToRead.seq_gap
<< " time=" << FormatTime(infoToRead.tsbpd_time));
}

const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), CUDTUnited::ERH_RETURN);
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Extracted data with %"
<< w_mc.pktseq << " #" << w_mc.msgno << ": " << (res <= 0 ? "(NOTHING)" : BufferStamp(buf, res)));
if (res == 0)
{
LOGC(grlog.Warn,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Retrying next socket...");
// This socket will not be socketToRead in the next turn because receiveMessage() return 0 here.
continue;
}
if (res == SRT_ERROR)
{
LOGC(grlog.Warn,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": " << srt_getlasterror_str()
<< ". Retrying next socket...");
broken.insert(socketToRead);
continue;
}
fillGroupData((w_mc), w_mc);

HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq);
m_RcvBaseSeqNo = w_mc.pktseq;

// Update stats as per delivery
m_stats.recv.count(res);
updateAvgPayloadSize(res);

for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}
}
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
if (!ps->core().isRcvBufferReady())
m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);
}

return res;
}
LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
#else
// The "app reader" version of the reading function.
// This reads the packets from every socket treating them as independent
// and prepared to work with the application. Then packets are sorted out
Expand Down Expand Up @@ -2731,6 +2918,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
}
}
}
#endif

// [[using locked(m_GroupLock)]]
CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead()
Expand Down