diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index 9a1b747b6..cc426586c 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -52,6 +52,7 @@ #include #include #include +#include #if ZM_MEM_MAPPED #include @@ -1123,9 +1124,7 @@ bool Monitor::connect() { #if HAVE_LIBCURL //janus setup. Depends on libcurl. if (janus_enabled && (path.find("rtsp://") != std::string::npos)) { if (add_to_janus() != 0) { - if (add_to_janus() != 0) { //The initial attempt may fail. This is a temporary workaround. - Warning("Failed to add monitor stream to Janus!"); - } + Warning("Failed to add monitor stream to Janus!"); //The first attempt may fail. Will be reattempted in the Poller thread } } #else @@ -1797,6 +1796,8 @@ void Monitor::UpdateFPS() { //Thread where ONVIF polling, and other similar status polling can happen. //Since these can be blocking, run here to avoid intefering with other processing bool Monitor::Poll() { + //We want to trigger every 5 seconds or so. so grab the time at the beginning of the loop, and sleep at the end. + std::chrono::system_clock::time_point loop_start_time = std::chrono::system_clock::now(); #ifdef WITH_GSOAP if (ONVIF_Healthy) { @@ -1838,10 +1839,14 @@ bool Monitor::Poll() { } } } - } else { - std::this_thread::sleep_for (std::chrono::seconds(1)); //thread sleep to avoid the busy loop. } #endif + if (janus_enabled) { + if (check_janus() != 1) { + add_to_janus(); + } + } + std::this_thread::sleep_until(loop_start_time + std::chrono::seconds(5)); return TRUE; } //end Poll @@ -3166,10 +3171,8 @@ int Monitor::PrimeCapture() { } } // end if rtsp_server -#ifdef WITH_GSOAP //For now, just don't run the thread if no ONVIF support. This may change if we add other long polling options. - //ONVIF Thread - - if (onvif_event_listener && ONVIF_Healthy) { + //Poller Thread + if (onvif_event_listener || janus_enabled) { if (!Poller) { Poller = zm::make_unique(this); @@ -3177,7 +3180,7 @@ int Monitor::PrimeCapture() { Poller->Start(); } } -#endif + if (decoding_enabled) { if (!decoder_it) decoder_it = packetqueue.get_video_it(false); if (!decoder) { @@ -3381,7 +3384,6 @@ size_t Monitor::WriteCallback(void *contents, size_t size, size_t nmemb, void *u } int Monitor::add_to_janus() { - //TODO clean this up, add error checking, etc std::string response; std::string endpoint; if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { @@ -3479,8 +3481,73 @@ int Monitor::add_to_janus() { curl_easy_cleanup(curl); return 0; } + +int Monitor::check_janus() { + std::string response; + std::string endpoint; + if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { + endpoint = config.janus_path; + } else { + endpoint = "127.0.0.1:8088/janus/"; + } + std::string postData = "{\"janus\" : \"create\", \"transaction\" : \"randomString\"}"; + std::size_t pos; + CURLcode res; + + curl = curl_easy_init(); + if(!curl) return -1; + + + //Start Janus API init. Need to get a session_id and handle_id + curl_easy_setopt(curl, CURLOPT_URL, endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) return -1; + + pos = response.find("\"id\": "); + if (pos == std::string::npos) return -1; + std::string janus_id = response.substr(pos + 6, 16); + response = ""; + endpoint += "/"; + endpoint += janus_id; + postData = "{\"janus\" : \"attach\", \"plugin\" : \"janus.plugin.streaming\", \"transaction\" : \"randomString\"}"; + curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) return -1; + + pos = response.find("\"id\": "); + if (pos == std::string::npos) return -1; + std::string handle_id = response.substr(pos + 6, 16); + endpoint += "/"; + endpoint += handle_id; + + //Assemble our actual request + postData = "{\"janus\" : \"message\", \"transaction\" : \"randomString\", \"body\" : {"; + postData += "\"request\" : \"info\", \"id\" : "; + postData += std::to_string(id); + postData += "}}"; + + curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) return -1; + + Debug(1, "Queried for stream status: %s", response.c_str()); + curl_easy_cleanup(curl); + if (response.find("No such mountpoint") == std::string::npos) { + return 1; + } else { + return 0; + } +} int Monitor::remove_from_janus() { - //TODO clean this up, add error checking, etc std::string response; std::string endpoint; if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { diff --git a/src/zm_monitor.h b/src/zm_monitor.h index 898349eb3..381855c97 100644 --- a/src/zm_monitor.h +++ b/src/zm_monitor.h @@ -521,6 +521,7 @@ public: bool OnvifEnabled() { return onvif_event_listener; } + int check_janus(); //returns 1 for healthy, 0 for success but missing stream, negative for error. #ifdef WITH_GSOAP bool OnvifHealthy() { return ONVIF_Healthy;