Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: buffer robustness #3743

5 changes: 5 additions & 0 deletions src/core/Settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,11 @@ function Settings() {
rtpSafetyFactor: 5,
mode: Constants.CMCD_MODE_QUERY
}
},
errors: {
recoverAttempts: {
mediaErrorDecode: 5
}
}
};

Expand Down
3 changes: 3 additions & 0 deletions src/core/events/CoreEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ class CoreEvents extends EventsBase {
this.MEDIA_FRAGMENT_LOADED = 'mediaFragmentLoaded';
this.MEDIA_FRAGMENT_NEEDED = 'mediaFragmentNeeded';
this.QUOTA_EXCEEDED = 'quotaExceeded';
this.SEGMENT_LOCATION_BLACKLIST_ADD = 'segmentLocationBlacklistAdd';
this.SEGMENT_LOCATION_BLACKLIST_CHANGED = 'segmentLocationBlacklistChanged';
this.SERVICE_LOCATION_BLACKLIST_ADD = 'serviceLocationBlacklistAdd';
this.SERVICE_LOCATION_BLACKLIST_CHANGED = 'serviceLocationBlacklistChanged';
this.SET_FRAGMENTED_TEXT_AFTER_DISABLED = 'setFragmentedTextAfterDisabled';
this.SET_NON_FRAGMENTED_TEXT = 'setNonFragmentedText';
this.SOURCE_BUFFER_ERROR = 'sourceBufferError';
this.STREAMS_COMPOSED = 'streamsComposed';
this.STREAM_BUFFERING_COMPLETED = 'streamBufferingCompleted';
this.STREAM_REQUESTING_COMPLETED = 'streamRequestingCompleted';
Expand Down
72 changes: 41 additions & 31 deletions src/streaming/SourceBufferSink.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Errors from '../core/errors/Errors';
import Settings from '../core/Settings';
import constants from './constants/Constants';
import {HTTPRequest} from './vo/metrics/HTTPRequest';
import Events from '../core/events/Events';

const APPEND_WINDOW_START_OFFSET = 0.1;
const APPEND_WINDOW_END_OFFSET = 0.01;
Expand All @@ -51,6 +52,7 @@ function SourceBufferSink(config) {
const context = this.context;
const settings = Settings(context).getInstance();
const textController = config.textController;
const eventBus = config.eventBus;

let instance,
type,
Expand All @@ -63,6 +65,7 @@ function SourceBufferSink(config) {
let appendQueue = [];
let isAppendingInProgress = false;
let mediaSource = config.mediaSource;
let lastRequestAppended = null;

function setup() {
logger = Debug(context).getInstance().getLogger(instance);
Expand Down Expand Up @@ -91,7 +94,7 @@ function SourceBufferSink(config) {

function changeType(codec) {
return new Promise((resolve) => {
waitForUpdateEnd(() => {
_waitForUpdateEnd(() => {
if (buffer.changeType) {
buffer.changeType(codec);
}
Expand Down Expand Up @@ -149,17 +152,17 @@ function SourceBufferSink(config) {
// use updateend event if possible
if (typeof buffer.addEventListener === 'function') {
try {
buffer.addEventListener('updateend', updateEndHandler, false);
buffer.addEventListener('error', errHandler, false);
buffer.addEventListener('abort', errHandler, false);
buffer.addEventListener('updateend', _updateEndHandler, false);
buffer.addEventListener('error', _errHandler, false);
buffer.addEventListener('abort', _errHandler, false);

} catch (err) {
// use setInterval to periodically check if updating has been completed
intervalId = setInterval(updateEndHandler, CHECK_INTERVAL);
intervalId = setInterval(_updateEndHandler, CHECK_INTERVAL);
}
} else {
// use setInterval to periodically check if updating has been completed
intervalId = setInterval(updateEndHandler, CHECK_INTERVAL);
intervalId = setInterval(_updateEndHandler, CHECK_INTERVAL);
}
}

Expand All @@ -170,9 +173,9 @@ function SourceBufferSink(config) {
function _removeEventListeners() {
try {
if (typeof buffer.removeEventListener === 'function') {
buffer.removeEventListener('updateend', updateEndHandler, false);
buffer.removeEventListener('error', errHandler, false);
buffer.removeEventListener('abort', errHandler, false);
buffer.removeEventListener('updateend', _updateEndHandler, false);
buffer.removeEventListener('error', _errHandler, false);
buffer.removeEventListener('abort', _errHandler, false);
}
clearInterval(intervalId);
} catch (e) {
Expand All @@ -188,7 +191,7 @@ function SourceBufferSink(config) {
return;
}

waitForUpdateEnd(() => {
_waitForUpdateEnd(() => {
try {
if (!buffer) {
resolve();
Expand Down Expand Up @@ -227,7 +230,7 @@ function SourceBufferSink(config) {
return;
}

waitForUpdateEnd(() => {
_waitForUpdateEnd(() => {
try {
if (buffer.timestampOffset !== MSETimeOffset && !isNaN(MSETimeOffset)) {
buffer.timestampOffset = MSETimeOffset;
Expand Down Expand Up @@ -258,6 +261,7 @@ function SourceBufferSink(config) {
}
buffer = null;
}
lastRequestAppended = null;
}

function getBuffer() {
Expand All @@ -273,7 +277,7 @@ function SourceBufferSink(config) {
}
}

function append(chunk) {
function append(chunk, request = null) {
return new Promise((resolve, reject) => {
if (!chunk) {
reject({
Expand All @@ -282,14 +286,14 @@ function SourceBufferSink(config) {
});
return;
}
appendQueue.push({ data: chunk, promise: { resolve, reject } });
waitForUpdateEnd(appendNextInQueue.bind(this));
appendQueue.push({ data: chunk, promise: { resolve, reject }, request });
_waitForUpdateEnd(_appendNextInQueue.bind(this));
});
}

function _abortBeforeAppend() {
return new Promise((resolve) => {
waitForUpdateEnd(() => {
_waitForUpdateEnd(() => {
// Save the append window, which is reset on abort().
const appendWindowStart = buffer.appendWindowStart;
const appendWindowEnd = buffer.appendWindowEnd;
Expand All @@ -313,11 +317,11 @@ function SourceBufferSink(config) {
return;
}

waitForUpdateEnd(function () {
_waitForUpdateEnd(function () {
try {
buffer.remove(start, end);
// updating is in progress, we should wait for it to complete before signaling that this operation is done
waitForUpdateEnd(function () {
_waitForUpdateEnd(function () {
resolve({
from: start,
to: end,
Expand All @@ -342,7 +346,7 @@ function SourceBufferSink(config) {
});
}

function appendNextInQueue() {
function _appendNextInQueue() {
if (isAppendingInProgress) {
return;
}
Expand All @@ -355,7 +359,7 @@ function SourceBufferSink(config) {
const afterSuccess = function () {
isAppendingInProgress = false;
if (appendQueue.length > 0) {
appendNextInQueue.call(this);
_appendNextInQueue.call(this);
}
// Init segments are cached. In any other case we dont need the chunk bytes anymore and can free the memory
if (nextChunk && nextChunk.data && nextChunk.data.segmentType && nextChunk.data.segmentType !== HTTPRequest.INIT_SEGMENT_TYPE) {
Expand All @@ -365,6 +369,7 @@ function SourceBufferSink(config) {
};

try {
lastRequestAppended = nextChunk.request;
if (nextChunk.data.bytes.byteLength === 0) {
afterSuccess.call(this);
} else {
Expand All @@ -374,12 +379,12 @@ function SourceBufferSink(config) {
buffer.append(nextChunk.data.bytes, nextChunk.data);
}
// updating is in progress, we should wait for it to complete before signaling that this operation is done
waitForUpdateEnd(afterSuccess.bind(this));
_waitForUpdateEnd(afterSuccess.bind(this));
}
} catch (err) {
logger.fatal('SourceBuffer append failed "' + err + '"');
if (appendQueue.length > 0) {
appendNextInQueue();
_appendNextInQueue();
} else {
isAppendingInProgress = false;
}
Expand All @@ -395,7 +400,7 @@ function SourceBufferSink(config) {
try {
appendQueue = [];
if (mediaSource.readyState === 'open') {
waitForUpdateEnd(() => {
_waitForUpdateEnd(() => {
buffer.abort();
resolve();
});
Expand All @@ -412,36 +417,42 @@ function SourceBufferSink(config) {

}

function executeCallback() {
function _executeCallback() {
if (callbacks.length > 0) {
if (!buffer.updating) {
const cb = callbacks.shift();
cb();
// Try to execute next callback if still not updating
executeCallback();
_executeCallback();
}
}
}

function updateEndHandler() {
function _updateEndHandler() {
// if updating is still in progress do nothing and wait for the next check again.
if (buffer.updating) {
return;
}

// updating is completed, now we can stop checking and resolve the promise
executeCallback();
_executeCallback();
}

function errHandler() {
logger.error('SourceBufferSink error');
function _errHandler(e) {
const error = e.target || {};
_triggerEvent(Events.SOURCE_BUFFER_ERROR, { error, lastRequestAppended })
}

function waitForUpdateEnd(callback) {
function _triggerEvent(eventType, data) {
let payload = data || {};
eventBus.trigger(eventType, payload, { streamId: mediaInfo.streamInfo.id, mediaType: type });
}

function _waitForUpdateEnd(callback) {
callbacks.push(callback);

if (!buffer.updating) {
executeCallback();
_executeCallback();
}
}

Expand All @@ -454,7 +465,6 @@ function SourceBufferSink(config) {
abort,
reset,
updateTimestampOffset,
waitForUpdateEnd,
initializeForStreamSwitch,
initializeForFirstUse,
updateAppendWindow,
Expand Down
43 changes: 28 additions & 15 deletions src/streaming/Stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import FactoryMaker from '../core/FactoryMaker';
import DashJSError from './vo/DashJSError';
import BoxParser from './utils/BoxParser';
import URLUtils from './utils/URLUtils';
import BlacklistController from './controllers/BlacklistController';


const MEDIA_TYPES = [Constants.VIDEO, Constants.AUDIO, Constants.TEXT, Constants.MUXED, Constants.IMAGE];
Expand Down Expand Up @@ -84,6 +85,7 @@ function Stream(config) {
isUpdating,
fragmentController,
thumbnailController,
segmentBlacklistController,
preloaded,
boxParser,
debug,
Expand All @@ -101,6 +103,11 @@ function Stream(config) {

boxParser = BoxParser(context).getInstance();

segmentBlacklistController = BlacklistController(context).create({
updateEventName: Events.SEGMENT_LOCATION_BLACKLIST_CHANGED,
addBlacklistEventName: Events.SEGMENT_LOCATION_BLACKLIST_ADD
});

fragmentController = FragmentController(context).create({
streamInfo: streamInfo,
mediaPlayerModel: mediaPlayerModel,
Expand Down Expand Up @@ -430,24 +437,25 @@ function Stream(config) {
const isFragmented = mediaInfo ? mediaInfo.isFragmented : null;

let streamProcessor = StreamProcessor(context).create({
streamInfo: streamInfo,
type: type,
mimeType: mimeType,
timelineConverter: timelineConverter,
adapter: adapter,
manifestModel: manifestModel,
mediaPlayerModel: mediaPlayerModel,
fragmentModel: fragmentModel,
streamInfo,
type,
mimeType,
timelineConverter,
adapter,
manifestModel,
mediaPlayerModel,
fragmentModel,
dashMetrics: config.dashMetrics,
baseURLController: config.baseURLController,
segmentBaseController: config.segmentBaseController,
abrController: abrController,
playbackController: playbackController,
mediaController: mediaController,
textController: textController,
errHandler: errHandler,
settings: settings,
boxParser: boxParser
abrController,
playbackController,
mediaController,
textController,
errHandler,
settings,
boxParser,
segmentBlacklistController
});

streamProcessor.initialize(mediaSource, hasVideoTrack, isFragmented);
Expand Down Expand Up @@ -560,6 +568,11 @@ function Stream(config) {
abrController.clearDataForStream(streamInfo.id);
}

if (segmentBlacklistController) {
segmentBlacklistController.reset();
segmentBlacklistController = null;
}

resetInitialSettings(keepBuffers);

streamInfo = null;
Expand Down
Loading