From 1246989ea94b88f313b99d5c8c4038d92b02f0b5 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 22 Feb 2015 10:45:13 +0800 Subject: [PATCH] for #179, refine dvr, support callback when reap dvr segment. --- trunk/conf/full.conf | 5 +- trunk/research/api-server/server.py | 27 +++- trunk/src/app/srs_app_config.cpp | 7 + trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_dvr.cpp | 183 +++++++++++++++++++++++---- trunk/src/app/srs_app_dvr.hpp | 33 ++--- trunk/src/app/srs_app_http_api.cpp | 10 +- trunk/src/app/srs_app_http_hooks.cpp | 56 ++++++++ trunk/src/app/srs_app_http_hooks.hpp | 9 ++ 9 files changed, 281 insertions(+), 50 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index cc6feb699b..41e1040a2a 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -297,7 +297,8 @@ vhost dvr.srs.com { # vhost, query all dvr of this vhost. # response in json, where: # {code:0, dvrs: [{path_tmpl:"./[15].[04].[05].[999].flv", path_dvr:"./22.7.43.312.flv", - # wait_keyframe:true, vhost:"__defaultVhost", callback:"http://dvr/callback" + # wait_keyframe:true, vhost:"__defaultVhost", callback:"http://127.0.0.1:8085/api/v1/dvrs", + # status:"stop"|"start" # }]} # method=POST # to start dvr of specified vhost. @@ -313,6 +314,8 @@ vhost dvr.srs.com { # vhost, stop all dvr of this vhost. # response in json, where: # {code:0} + # when reap segment, the callback POST request in json: + # {action:"on_dvr_reap_segment"} # default: session dvr_plan session; # the dvr output path. diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index 8c1f4823b5..5fcd0ea63a 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -253,7 +253,7 @@ def GET(self): return json.dumps(dvrs) ''' - for SRS hook: on_dvr + for SRS hook: on_dvr, on_dvr_reap_segment on_dvr: when srs reap a dvr file, call the hook, the request in the POST data string is a object encode by json: @@ -265,6 +265,17 @@ def GET(self): "cwd": "/usr/local/srs", "file": "./objs/nginx/html/live/livestream.1420254068776.flv" } + on_dvr_reap_segment: + when api dvr specifes the callback when reap flv segment, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_dvr_reap_segment", + "client_id": 1985, + "vhost": "video.test.com", "app": "live", + "stream": "livestream", + "cwd": "/usr/local/srs", + "file": "./objs/nginx/html/live/livestream.1420254068776.flv" + } if valid, the hook must return HTTP code 200(Stauts OK) and response an int value specifies the error code(0 corresponding to success): 0 @@ -287,6 +298,8 @@ def POST(self): action = json_req["action"] if action == "on_dvr": code = self.__on_dvr(json_req) + if action == "on_dvr_reap_segment": + code = self.__on_dvr_reap_segment(json_req) else: trace("invalid request action: %s"%(json_req["action"])) code = Error.request_invalid_action @@ -308,6 +321,18 @@ def __on_dvr(self, req): return code + def __on_dvr_reap_segment(self, req): + code = Error.success + + trace("srs %s: client id=%s, vhost=%s, app=%s, stream=%s, cwd=%s, file=%s"%( + req["action"], req["client_id"], req["vhost"], req["app"], req["stream"], + req["cwd"], req["file"] + )) + + # TODO: process the on_dvr event + + return code + ''' handle the sessions requests: client play/stop stream ''' diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index c0059ad8cd..b0f8219c1c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3342,6 +3342,13 @@ bool SrsConfig::get_dvr_enabled(string vhost) return false; } +void SrsConfig::set_dvr_enabled(string vhost, bool enabled) +{ + SrsConfDirective* conf = create_directive(vhost, "dvr", "enabled"); + conf->args.clear(); + conf->args.push_back(enabled? "on":"off"); +} + string SrsConfig::get_dvr_path(string vhost) { SrsConfDirective* dvr = get_dvr(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 827daf2ac8..e2e595ef49 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -923,6 +923,7 @@ class SrsConfig * whether dvr is enabled. */ virtual bool get_dvr_enabled(std::string vhost); + virtual void set_dvr_enabled(std::string vhost, bool enabled); /** * get the dvr path, the flv file to save in. */ diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 494f06ca9b..c5a325361f 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -144,15 +144,15 @@ int SrsFlvSegment::open(bool use_tmp_file) return ret; } } + + // initialize the encoder. + if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) { + srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret); + return ret; + } // when exists, donot write flv header. if (fresh_flv_file) { - // initialize the encoder. - if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) { - srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret); - return ret; - } - // write the flv header to writer. if ((ret = enc->write_header()) != ERROR_SUCCESS) { srs_error("write flv header failed. ret=%d", ret); @@ -195,6 +195,8 @@ int SrsFlvSegment::close() } } + // TODO: FIXME: the http callback is async, which will trigger thread switch, + // so the on_video maybe invoked during the http callback, and error. if ((ret = plan->on_reap_segment()) != ERROR_SUCCESS) { srs_error("dvr: notify plan to reap segment failed. ret=%d", ret); return ret; @@ -743,10 +745,15 @@ SrsDvrApiPlan::SrsDvrApiPlan() { autostart = false; started = false; + + metadata = sh_audio = sh_video = NULL; } SrsDvrApiPlan::~SrsDvrApiPlan() { + srs_freep(metadata); + srs_freep(sh_audio); + srs_freep(sh_video); } int SrsDvrApiPlan::initialize(SrsSource* s, SrsRequest* r) @@ -787,6 +794,8 @@ int SrsDvrApiPlan::on_publish() return ret; } + dvr_enabled = true; + if ((ret = segment->close()) != ERROR_SUCCESS) { return ret; } @@ -794,8 +803,17 @@ int SrsDvrApiPlan::on_publish() if ((ret = segment->open()) != ERROR_SUCCESS) { return ret; } - - dvr_enabled = true; + + // update sequence header + if (metadata && (ret = SrsDvrPlan::on_meta_data(metadata)) != ERROR_SUCCESS) { + return ret; + } + if (sh_video && (ret = SrsDvrPlan::on_video(sh_video)) != ERROR_SUCCESS) { + return ret; + } + if (sh_audio && (ret = SrsDvrPlan::on_audio(sh_audio)) != ERROR_SUCCESS) { + return ret; + } return ret; } @@ -804,6 +822,48 @@ void SrsDvrApiPlan::on_unpublish() { } +int SrsDvrApiPlan::on_meta_data(SrsSharedPtrMessage* __metadata) +{ + int ret = ERROR_SUCCESS; + + srs_freep(metadata); + metadata = __metadata->copy(); + + return ret; +} + +int SrsDvrApiPlan::on_audio(SrsSharedPtrMessage* __audio) +{ + int ret = ERROR_SUCCESS; + + if (SrsFlvCodec::audio_is_sequence_header(__audio->payload, __audio->size)) { + srs_freep(sh_audio); + sh_audio = __audio->copy(); + } + + if ((ret = SrsDvrPlan::on_audio(__audio)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDvrApiPlan::on_video(SrsSharedPtrMessage* __video) +{ + int ret = ERROR_SUCCESS; + + if (SrsFlvCodec::video_is_sequence_header(__video->payload, __video->size)) { + srs_freep(sh_video); + sh_video = __video->copy(); + } + + if ((ret = SrsDvrPlan::on_video(__video)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + int SrsDvrApiPlan::set_path_tmpl(string path_tmpl) { _srs_config->set_dvr_path(req->vhost, path_tmpl); @@ -830,6 +890,9 @@ int SrsDvrApiPlan::start() return ret; } + // enable the config. + _srs_config->set_dvr_enabled(req->vhost, true); + // stop dvr if (dvr_enabled) { // ignore error. @@ -862,16 +925,58 @@ int SrsDvrApiPlan::dumps(stringstream& ss) << __SRS_JFIELD_STR("path_dvr", segment->get_path()) << __SRS_JFIELD_CONT << __SRS_JFIELD_BOOL("wait_keyframe", wait_keyframe) << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("vhost", req->vhost) << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("callback", callback) + << __SRS_JFIELD_STR("callback", callback) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("status", (dvr_enabled? "start":"stop")) << __SRS_JOBJECT_END; return ret; } +int SrsDvrApiPlan::stop() +{ + int ret = ERROR_SUCCESS; + + _srs_config->set_dvr_enabled(req->vhost, false); + started = false; + + // stop dvr + if (dvr_enabled) { + // ignore error. + int ret = segment->close(); + if (ret != ERROR_SUCCESS) { + srs_warn("ignore flv close error. ret=%d", ret); + } + + dvr_enabled = false; + } + + srs_trace("dvr: stop dvr of vhost=%s", req->vhost.c_str()); + + return ret; +} + int SrsDvrApiPlan::on_reap_segment() { - // TODO: FIXME: implements it. - return ERROR_SUCCESS; + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_CALLBACK + // HTTP: callback + if (callback.empty()) { + srs_warn("dvr: ignore for callback empty, vhost=%s", req->vhost.c_str()); + return ret; + } + + int connection_id = _srs_context->get_id(); + std::string cwd = _srs_config->cwd(); + std::string file = segment->get_path(); + std::string url = callback; + if ((ret = SrsHttpHooks::on_dvr_reap_segment(url, connection_id, req, cwd, file)) != ERROR_SUCCESS) { + srs_error("hook client on_dvr_reap_segment failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } +#endif + + return ret; } SrsDvrAppendPlan::SrsDvrAppendPlan() @@ -909,30 +1014,30 @@ void SrsDvrAppendPlan::on_unpublish() { } -int SrsDvrAppendPlan::on_audio(SrsSharedPtrMessage* audio) +int SrsDvrAppendPlan::on_audio(SrsSharedPtrMessage* __audio) { int ret = ERROR_SUCCESS; - if ((ret = update_duration(audio)) != ERROR_SUCCESS) { + if ((ret = update_duration(__audio)) != ERROR_SUCCESS) { return ret; } - if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) { + if ((ret = SrsDvrPlan::on_audio(__audio)) != ERROR_SUCCESS) { return ret; } return ret; } -int SrsDvrAppendPlan::on_video(SrsSharedPtrMessage* video) +int SrsDvrAppendPlan::on_video(SrsSharedPtrMessage* __video) { int ret = ERROR_SUCCESS; - if ((ret = update_duration(video)) != ERROR_SUCCESS) { + if ((ret = update_duration(__video)) != ERROR_SUCCESS) { return ret; } - if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) { + if ((ret = SrsDvrPlan::on_video(__video)) != ERROR_SUCCESS) { return ret; } @@ -1038,40 +1143,40 @@ int SrsDvrSegmentPlan::on_meta_data(SrsSharedPtrMessage* __metadata) return ret; } -int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio) +int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* __audio) { int ret = ERROR_SUCCESS; - if (SrsFlvCodec::audio_is_sequence_header(audio->payload, audio->size)) { + if (SrsFlvCodec::audio_is_sequence_header(__audio->payload, __audio->size)) { srs_freep(sh_audio); - sh_audio = audio->copy(); + sh_audio = __audio->copy(); } - if ((ret = update_duration(audio)) != ERROR_SUCCESS) { + if ((ret = update_duration(__audio)) != ERROR_SUCCESS) { return ret; } - if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) { + if ((ret = SrsDvrPlan::on_audio(__audio)) != ERROR_SUCCESS) { return ret; } return ret; } -int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* video) +int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* __video) { int ret = ERROR_SUCCESS; - if (SrsFlvCodec::video_is_sequence_header(video->payload, video->size)) { + if (SrsFlvCodec::video_is_sequence_header(__video->payload, __video->size)) { srs_freep(sh_video); - sh_video = video->copy(); + sh_video = __video->copy(); } - if ((ret = update_duration(video)) != ERROR_SUCCESS) { + if ((ret = update_duration(__video)) != ERROR_SUCCESS) { return ret; } - if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) { + if ((ret = SrsDvrPlan::on_video(__video)) != ERROR_SUCCESS) { return ret; } @@ -1240,6 +1345,30 @@ int SrsApiDvrPool::create(SrsJsonAny* json) return dvr->start(); } +int SrsApiDvrPool::stop(string vhost) +{ + int ret = ERROR_SUCCESS; + + std::vector plans; + for (int i = 0; i < (int)dvrs.size(); i++) { + SrsDvrApiPlan* plan = dvrs.at(i); + if (!vhost.empty() && plan->req->vhost != vhost) { + continue; + } + plans.push_back(plan); + } + + for (int i = 0; i < (int)plans.size(); i++) { + SrsDvrApiPlan* plan = plans.at(i); + + if ((ret = plan->stop()) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; +} + SrsDvr::SrsDvr(SrsSource* s) { source = s; diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index 17c9822daf..651a8bf606 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -236,6 +236,11 @@ class SrsDvrSessionPlan : public SrsDvrPlan */ class SrsDvrApiPlan : public SrsDvrPlan { +private: + // cache the metadata and sequence header, for new segment maybe opened. + SrsSharedPtrMessage* sh_audio; + SrsSharedPtrMessage* sh_video; + SrsSharedPtrMessage* metadata; private: std::string callback; bool autostart; @@ -247,12 +252,16 @@ class SrsDvrApiPlan : public SrsDvrPlan virtual int initialize(SrsSource* s, SrsRequest* r); virtual int on_publish(); virtual void on_unpublish(); + virtual int on_meta_data(SrsSharedPtrMessage* __metadata); + virtual int on_audio(SrsSharedPtrMessage* __audio); + virtual int on_video(SrsSharedPtrMessage* __video); public: virtual int set_path_tmpl(std::string path_tmpl); virtual int set_callback(std::string value); virtual int set_wait_keyframe(bool wait_keyframe); virtual int start(); virtual int dumps(std::stringstream& ss); + virtual int stop(); protected: virtual int on_reap_segment(); }; @@ -270,14 +279,8 @@ class SrsDvrAppendPlan : public SrsDvrPlan public: virtual int on_publish(); virtual void on_unpublish(); - /** - * @param audio, directly ptr, copy it if need to save it. - */ - virtual int on_audio(SrsSharedPtrMessage* audio); - /** - * @param video, directly ptr, copy it if need to save it. - */ - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_audio(SrsSharedPtrMessage* __audio); + virtual int on_video(SrsSharedPtrMessage* __video); private: virtual int update_duration(SrsSharedPtrMessage* msg); }; @@ -300,18 +303,9 @@ class SrsDvrSegmentPlan : public SrsDvrPlan virtual int initialize(SrsSource* source, SrsRequest* req); virtual int on_publish(); virtual void on_unpublish(); - /** - * when got metadata. - */ virtual int on_meta_data(SrsSharedPtrMessage* __metadata); - /** - * @param audio, directly ptr, copy it if need to save it. - */ - virtual int on_audio(SrsSharedPtrMessage* audio); - /** - * @param video, directly ptr, copy it if need to save it. - */ - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_audio(SrsSharedPtrMessage* __audio); + virtual int on_video(SrsSharedPtrMessage* __video); private: virtual int update_duration(SrsSharedPtrMessage* msg); }; @@ -334,6 +328,7 @@ class SrsApiDvrPool public: virtual int dumps(std::string vhost, std::stringstream& ss); virtual int create(SrsJsonAny* json); + virtual int stop(std::string vhost); }; /** diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index fde565e332..69e62e2cfe 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -501,8 +501,8 @@ int SrsGoApiDvrs::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) << __SRS_JFIELD_ORG("dvrs", data.str()) << __SRS_JOBJECT_END; } else if (r->is_http_post()) { - char* body = (char*)r->body().c_str(); - SrsJsonAny* json = SrsJsonAny::loads(body); + std::string body = r->body(); + SrsJsonAny* json = SrsJsonAny::loads((char*)body.c_str()); int ret = ERROR_SUCCESS; if (!json) { ret = ERROR_HTTP_JSON_REQUIRED; @@ -510,6 +510,12 @@ int SrsGoApiDvrs::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) SrsAutoFree(SrsJsonAny, json); ret = pool->create(json); } + ss << __SRS_JOBJECT_START + << __SRS_JFIELD_ERROR(ret) + << __SRS_JOBJECT_END; + } else if (r->is_http_delete()) { + int ret = pool->stop(r->query_get("vhost")); + ss << __SRS_JOBJECT_START << __SRS_JFIELD_ERROR(ret) << __SRS_JOBJECT_END; diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index cd73a17ebb..4a02232cc0 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -436,4 +436,60 @@ int SrsHttpHooks::on_dvr(string url, int client_id, string ip, SrsRequest* req, return ret; } +int SrsHttpHooks::on_dvr_reap_segment(string url, int client_id, SrsRequest* req, string cwd, string file) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_error("http uri parse on_dvr_reap_segment url failed, ignored. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return ret; + } + + std::stringstream ss; + ss << __SRS_JOBJECT_START + << __SRS_JFIELD_STR("action", "on_dvr_reap_segment") << __SRS_JFIELD_CONT + << __SRS_JFIELD_ORG("client_id", client_id) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("vhost", req->vhost) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("app", req->app) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("stream", req->stream) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("cwd", cwd) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("file", file) + << __SRS_JOBJECT_END; + std::string data = ss.str(); + std::string res; + int status_code; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, status_code, res)) != ERROR_SUCCESS) { + srs_error("http post on_dvr_reap_segment uri failed, ignored. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return ret; + } + + // ensure the http status is ok. + // https://github.com/winlinvip/simple-rtmp-server/issues/158 + if (status_code != SRS_CONSTS_HTTP_OK) { + ret = ERROR_HTTP_STATUS_INVLIAD; + srs_error("http hook on_dvr_reap_segment status failed. " + "client_id=%d, code=%d, ret=%d", client_id, status_code, ret); + return ret; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_warn("http hook on_dvr_reap_segment validate failed, ignored. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return ret; + } + + srs_trace("http hook on_dvr_reap_segment success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return ret; +} + #endif diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index c4286d13c3..ebc1452ba1 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -104,6 +104,15 @@ class SrsHttpHooks * @param file the file path, can be relative or absolute path. */ static int on_dvr(std::string url, int client_id, std::string ip, SrsRequest* req, std::string cwd, std::string file); + /** + * when dvr reap segment, callback. + * @param client_id the id of client on server. + * @param url the api server url, to process the event. + * ignore if empty. + * @param cwd the current work directory, used to resolve the reltive file path. + * @param file the file path, can be relative or absolute path. + */ + static int on_dvr_reap_segment(std::string url, int client_id, SrsRequest* req, std::string cwd, std::string file); }; #endif