diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 439a77291..6943a102a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -34,6 +34,8 @@ set(ZM_BIN_SRC_FILES zm_local_camera.cpp zm_monitor.cpp zm_monitor_monitorlink.cpp + zm_monitor_janus.cpp + zm_monitor_amcrest.cpp zm_monitorstream.cpp zm_ffmpeg.cpp zm_ffmpeg_camera.cpp diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index c3333e051..576b4d517 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -130,175 +130,7 @@ std::string TriggerState_Strings[] = { "Cancel", "On", "Off" }; -Monitor::MonitorLink::MonitorLink(unsigned int p_id, const char *p_name) : - id(p_id), - shared_data(nullptr), - trigger_data(nullptr), - video_store_data(nullptr) -{ - strncpy(name, p_name, sizeof(name)-1); - -#if ZM_MEM_MAPPED - map_fd = -1; - mem_file = stringtf("%s/zm.mmap.%u", staticConfig.PATH_MAP.c_str(), id); -#else // ZM_MEM_MAPPED - shm_id = 0; -#endif // ZM_MEM_MAPPED - mem_size = 0; - mem_ptr = nullptr; - - last_event_id = 0; - last_state = IDLE; - - last_connect_time = 0; - connected = false; -} - -Monitor::MonitorLink::~MonitorLink() { - disconnect(); -} - -bool Monitor::MonitorLink::connect() { - SystemTimePoint now = std::chrono::system_clock::now(); - if (!last_connect_time || (now - std::chrono::system_clock::from_time_t(last_connect_time)) > Seconds(60)) { - last_connect_time = std::chrono::system_clock::to_time_t(now); - - mem_size = sizeof(SharedData) + sizeof(TriggerData); - - Debug(1, "link.mem.size=%jd", static_cast(mem_size)); -#if ZM_MEM_MAPPED - map_fd = open(mem_file.c_str(), O_RDWR, (mode_t)0600); - if (map_fd < 0) { - Debug(3, "Can't open linked memory map file %s: %s", mem_file.c_str(), strerror(errno)); - disconnect(); - return false; - } - while (map_fd <= 2) { - int new_map_fd = dup(map_fd); - Warning("Got one of the stdio fds for our mmap handle. map_fd was %d, new one is %d", map_fd, new_map_fd); - close(map_fd); - map_fd = new_map_fd; - } - - struct stat map_stat; - if (fstat(map_fd, &map_stat) < 0) { - Error("Can't stat linked memory map file %s: %s", mem_file.c_str(), strerror(errno)); - disconnect(); - return false; - } - - if (map_stat.st_size == 0) { - Error("Linked memory map file %s is empty: %s", mem_file.c_str(), strerror(errno)); - disconnect(); - return false; - } else if (map_stat.st_size < mem_size) { - Error("Got unexpected memory map file size %ld, expected %jd", map_stat.st_size, static_cast(mem_size)); - disconnect(); - return false; - } - - mem_ptr = (unsigned char *)mmap(nullptr, mem_size, PROT_READ|PROT_WRITE, MAP_SHARED, map_fd, 0); - if (mem_ptr == MAP_FAILED) { - Error("Can't map file %s (%jd bytes) to memory: %s", mem_file.c_str(), static_cast(mem_size), strerror(errno)); - disconnect(); - return false; - } -#else // ZM_MEM_MAPPED - shm_id = shmget((config.shm_key&0xffff0000)|id, mem_size, 0700); - if (shm_id < 0) { - Debug(3, "Can't shmget link memory: %s", strerror(errno)); - connected = false; - return false; - } - mem_ptr = (unsigned char *)shmat(shm_id, 0, 0); - if ((int)mem_ptr == -1) { - Debug(3, "Can't shmat link memory: %s", strerror(errno)); - connected = false; - return false; - } -#endif // ZM_MEM_MAPPED - - shared_data = (SharedData *)mem_ptr; - trigger_data = (TriggerData *)((char *)shared_data + sizeof(SharedData)); - - if (!shared_data->valid) { - Debug(3, "Linked memory not initialised by capture daemon"); - disconnect(); - return false; - } - - last_state = shared_data->state; - last_event_id = shared_data->last_event_id; - connected = true; - - return true; - } - return false; -} // end bool Monitor::MonitorLink::connect() - -bool Monitor::MonitorLink::disconnect() { - if (connected) { - connected = false; - -#if ZM_MEM_MAPPED - if (mem_ptr > (void *)0) { - msync(mem_ptr, mem_size, MS_ASYNC); - munmap(mem_ptr, mem_size); - } - if (map_fd >= 0) - close(map_fd); - - map_fd = -1; -#else // ZM_MEM_MAPPED - struct shmid_ds shm_data; - if (shmctl(shm_id, IPC_STAT, &shm_data) < 0) { - Debug(3, "Can't shmctl: %s", strerror(errno)); - return false; - } - - shm_id = 0; - - if (shm_data.shm_nattch <= 1) { - if (shmctl(shm_id, IPC_RMID, 0) < 0) { - Debug(3, "Can't shmctl: %s", strerror(errno)); - return false; - } - } - - if (shmdt(mem_ptr) < 0) { - Debug(3, "Can't shmdt: %s", strerror(errno)); - return false; - } -#endif // ZM_MEM_MAPPED - mem_size = 0; - mem_ptr = nullptr; - } - return true; -} - -bool Monitor::MonitorLink::isAlarmed() { - if (!connected) { - return false; - } - return( shared_data->state == ALARM ); -} - -bool Monitor::MonitorLink::inAlarm() { - if (!connected) { - return false; - } - return( shared_data->state == ALARM || shared_data->state == ALERT ); -} - -bool Monitor::MonitorLink::hasAlarmed() { - if (shared_data->state == ALARM) { - return true; - } - last_event_id = shared_data->last_event_id; - return false; -} - -Monitor::Monitor() +Monitor::Monitor() : id(0), name(""), server_id(0), @@ -317,7 +149,7 @@ Monitor::Monitor() //user //pass //path - //device + //device palette(0), channel(0), format(0), @@ -422,7 +254,9 @@ Monitor::Monitor() privacy_bitmask(nullptr), n_linked_monitors(0), linked_monitors(nullptr), - ONVIF_Closes_Event(FALSE), + Event_Poller_Closes_Event(FALSE), + Janus_Manager(nullptr), + Amcrest_Manager(nullptr), #ifdef WITH_GSOAP soap(nullptr), #endif @@ -1083,21 +917,16 @@ bool Monitor::connect() { //ONVIF and Amcrest Setup - //since they serve the same function, handling them as two options of the same feature. - ONVIF_Trigger_State = FALSE; + //For now, only support one event type per camera, so share some state. + Poll_Trigger_State = FALSE; if (onvif_event_listener) { // Debug(1, "Starting ONVIF"); - ONVIF_Healthy = FALSE; + Event_Poller_Healthy = FALSE; if (onvif_options.find("closes_event") != std::string::npos) { //Option to indicate that ONVIF will send a close event message - ONVIF_Closes_Event = TRUE; + Event_Poller_Closes_Event = TRUE; } if (use_Amcrest_API) { - curl_multi = curl_multi_init(); - start_Amcrest(); - //spin up curl_multi - //use the onvif_user and onvif_pass and onvif_url here. - //going to use the non-blocking curl api, and in the polling thread, block for 5 seconds waiting for input, just like onvif - //note that it's not possible for a single camera to use both. + Amcrest_Manager = new AmcrestAPI(this); } else { //using GSOAP #ifdef WITH_GSOAP tev__PullMessages.Timeout = "PT600S"; @@ -1123,7 +952,7 @@ bool Monitor::connect() { Error("Couldn't do initial event pull! Error %i %s, %s", soap->error, soap_fault_string(soap), soap_fault_detail(soap)); } else { Debug(1, "Good Initial ONVIF Pull"); - ONVIF_Healthy = TRUE; + Event_Poller_Healthy = TRUE; } } #else @@ -1135,17 +964,9 @@ bool Monitor::connect() { } //End ONVIF Setup -#if HAVE_LIBCURL //janus setup. Depends on libcurl. - if (janus_enabled && (path.find("rtsp://") != std::string::npos)) { - get_janus_session(); - if (add_to_janus() != 0) { - Warning("Failed to add monitor stream to Janus!"); //The first attempt may fail. Will be reattempted in the Poller thread - } + if (janus_enabled) { + Janus_Manager = new JanusManager(this); } -#else - if (janus_enabled) - Error("zmc not compiled with LIBCURL. Janus support not built in!"); -#endif } else if (!shared_data->valid) { Error("Shared data not initialised by capture daemon for monitor %s", name.c_str()); @@ -1242,12 +1063,12 @@ Monitor::~Monitor() { sws_freeContext(convert_context); convert_context = nullptr; } - if (Amcrest_handle != nullptr) { - curl_multi_remove_handle(curl_multi, Amcrest_handle); - curl_easy_cleanup(Amcrest_handle); + if (Amcrest_Manager != nullptr) { + delete Amcrest_Manager; + } + if (purpose == CAPTURE) { + curl_global_cleanup(); //not sure about this location. } - if (curl_multi != nullptr) curl_multi_cleanup(curl_multi); - curl_global_cleanup(); } // end Monitor::~Monitor() void Monitor::AddPrivacyBitmask() { @@ -1817,38 +1638,13 @@ void Monitor::UpdateFPS() { //Thread where ONVIF polling, and other similar status polling can happen. //Since these can be blocking, run here to avoid intefering with other processing bool Monitor::Poll() { + //We want to trigger every 5 seconds or so. so grab the time at the beginning of the loop, and sleep at the end. std::chrono::system_clock::time_point loop_start_time = std::chrono::system_clock::now(); - if (ONVIF_Healthy) { + if (Event_Poller_Healthy) { if(use_Amcrest_API) { - int open_handles; - int transfers; - curl_multi_perform(curl_multi, &open_handles); - if (open_handles == 0) { - start_Amcrest(); //http transfer ended, need to restart. - } else { - curl_multi_wait(curl_multi, NULL, 0, 5000, &transfers); //wait for max 5 seconds for event. - if (transfers > 0) { //have data to deal with - curl_multi_perform(curl_multi, &open_handles); //actually grabs the data, populates amcrest_response - if (amcrest_response.find("action=Start") != std::string::npos) { - //Event Start - Debug(1,"Triggered on ONVIF"); - if (!ONVIF_Trigger_State) { - Debug(1,"Triggered Event"); - ONVIF_Trigger_State = TRUE; - } - } else if (amcrest_response.find("action=Stop") != std::string::npos){ - Debug(1, "Triggered off ONVIF"); - ONVIF_Trigger_State = FALSE; - if (!ONVIF_Closes_Event) { //If we get a close event, then we know to expect them. - ONVIF_Closes_Event = TRUE; - Debug(1,"Setting ClosesEvent"); - } - } - amcrest_response.clear(); //We've dealt with the message, need to clear the queue - } - } + Amcrest_Manager->WaitForMessage(); } else { #ifdef WITH_GSOAP @@ -1857,7 +1653,7 @@ bool Monitor::Poll() { if (result != SOAP_OK) { if (result != SOAP_EOF) { //Ignore the timeout error Error("Failed to get ONVIF messages! %s", soap_fault_string(soap)); - ONVIF_Healthy = FALSE; + Event_Poller_Healthy = FALSE; } } else { Debug(1, "Got Good Response! %i", result); @@ -1874,16 +1670,16 @@ bool Monitor::Poll() { if (strcmp(msg->Message.__any.elts->next->elts->atts->next->text, "true") == 0) { //Event Start Debug(1,"Triggered on ONVIF"); - if (!ONVIF_Trigger_State) { + if (!Poll_Trigger_State) { Debug(1,"Triggered Event"); - ONVIF_Trigger_State = TRUE; + Poll_Trigger_State = TRUE; std::this_thread::sleep_for (std::chrono::seconds(1)); //thread sleep } } else { Debug(1, "Triggered off ONVIF"); - ONVIF_Trigger_State = FALSE; - if (!ONVIF_Closes_Event) { //If we get a close event, then we know to expect them. - ONVIF_Closes_Event = TRUE; + Poll_Trigger_State = FALSE; + if (!Event_Poller_Closes_Event) { //If we get a close event, then we know to expect them. + Event_Poller_Closes_Event = TRUE; Debug(1,"Setting ClosesEvent"); } } @@ -1894,11 +1690,9 @@ bool Monitor::Poll() { } } if (janus_enabled) { - if (janus_session.empty()) { - get_janus_session(); - } - if (check_janus() == 0) { - add_to_janus(); + + if (Janus_Manager->check_janus() == 0) { + Janus_Manager->add_to_janus(); } } std::this_thread::sleep_until(loop_start_time + std::chrono::seconds(5)); @@ -1953,8 +1747,8 @@ bool Monitor::Analyse() { Event::StringSetMap noteSetMap; #ifdef WITH_GSOAP - if (onvif_event_listener && ONVIF_Healthy) { - if (ONVIF_Trigger_State) { + if (onvif_event_listener && Event_Poller_Healthy) { + if (Poll_Trigger_State) { score += 9; Debug(1, "Triggered on ONVIF"); Event::StringSet noteSet; @@ -1962,10 +1756,10 @@ bool Monitor::Analyse() { noteSetMap[MOTION_CAUSE] = noteSet; cause += "ONVIF"; //If the camera isn't going to send an event close, we need to close it here, but only after it has actually triggered an alarm. - if (!ONVIF_Closes_Event && state == ALARM) - ONVIF_Trigger_State = FALSE; + if (!Event_Poller_Closes_Event && state == ALARM) + Poll_Trigger_State = FALSE; } // end ONVIF_Trigger - } // end if (onvif_event_listener && ONVIF_Healthy) + } // end if (onvif_event_listener && Event_Poller_Healthy) #endif // Specifically told to be on. Setting the score here is not enough to trigger the alarm. Must jump directly to ALARM @@ -3287,8 +3081,7 @@ int Monitor::PrimeCapture() { } // end if rtsp_server //Poller Thread - if (onvif_event_listener || janus_enabled) { - + if (onvif_event_listener || janus_enabled || use_Amcrest_API) { if (!Poller) { Poller = zm::make_unique(this); } else { @@ -3338,10 +3131,6 @@ int Monitor::Close() { if (Poller) { Poller->Stop(); } - if (curl_multi != nullptr) { - curl_multi_cleanup(curl_multi); - curl_multi = nullptr; - } #ifdef WITH_GSOAP if (onvif_event_listener && (soap != nullptr)) { Debug(1, "Tearing Down Onvif"); @@ -3354,11 +3143,11 @@ int Monitor::Close() { soap = nullptr; } //End ONVIF #endif -#if HAVE_LIBCURL //Janus Teardown + //Janus Teardown if (janus_enabled && (purpose == CAPTURE)) { - remove_from_janus(); + delete Janus_Manager; } -#endif + packetqueue.clear(); if (audio_fifo) { @@ -3495,289 +3284,3 @@ int SOAP_ENV__Fault(struct soap *soap, char *faultcode, char *faultstring, char return soap_send_empty_response(soap, SOAP_OK); } #endif - -size_t Monitor::WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) -{ - ((std::string*)userp)->append((char*)contents, size * nmemb); - return size * nmemb; -} - -int Monitor::add_to_janus() { - std::string response; - std::string endpoint; - if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { - endpoint = config.janus_path; - } else { - endpoint = "127.0.0.1:8088/janus/"; - } - std::string postData = "{\"janus\" : \"create\", \"transaction\" : \"randomString\"}"; - std::string rtsp_username; - std::string rtsp_password; - std::string rtsp_path = "rtsp://"; - std::size_t pos; - std::size_t pos2; - CURLcode res; - - curl = curl_easy_init(); - if (!curl) { - Error("Failed to init curl"); - return -1; - } - //parse username and password - pos = path.find(":", 7); - if (pos == std::string::npos) return -1; - rtsp_username = path.substr(7, pos-7); - - pos2 = path.find("@", pos); - if (pos2 == std::string::npos) return -1; - - rtsp_password = path.substr(pos+1, pos2 - pos - 1); - rtsp_path += path.substr(pos2 + 1); - - endpoint += "/"; - endpoint += janus_session; - - //Assemble our actual request - postData = "{\"janus\" : \"message\", \"transaction\" : \"randomString\", \"body\" : {"; - postData += "\"request\" : \"create\", \"admin_key\" : \""; - postData += config.janus_secret; - postData += "\", \"type\" : \"rtsp\", "; - postData += "\"url\" : \""; - postData += rtsp_path; - postData += "\", \"rtsp_user\" : \""; - postData += rtsp_username; - postData += "\", \"rtsp_pwd\" : \""; - postData += rtsp_password; - postData += "\", \"id\" : "; - postData += std::to_string(id); - if (janus_audio_enabled) postData += ", \"audio\" : true"; - postData += ", \"video\" : true}}"; - - curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); - res = curl_easy_perform(curl); - if (res != CURLE_OK) { - Error("Failed to curl_easy_perform adding rtsp stream"); - curl_easy_cleanup(curl); - return -1; - } - if ((response.find("error") != std::string::npos) && ((response.find("No such session") != std::string::npos) || (response.find("No such handle") != std::string::npos))) { - janus_session = ""; - curl_easy_cleanup(curl); - return -2; - } - //scan for missing session or handle id "No such session" "no such handle" - - Debug(1,"Added stream to Janus: %s", response.c_str()); - curl_easy_cleanup(curl); - return 0; -} - -int Monitor::check_janus() { - std::string response; - std::string endpoint; - if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { - endpoint = config.janus_path; - } else { - endpoint = "127.0.0.1:8088/janus/"; - } - std::string postData; - //std::size_t pos; - CURLcode res; - - curl = curl_easy_init(); - if(!curl) return -1; - - endpoint += "/"; - endpoint += janus_session; - - //Assemble our actual request - postData = "{\"janus\" : \"message\", \"transaction\" : \"randomString\", \"body\" : {"; - postData += "\"request\" : \"info\", \"id\" : "; - postData += std::to_string(id); - postData += "}}"; - - curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); - res = curl_easy_perform(curl); - if (res != CURLE_OK) { //may mean an error code thrown by Janus, because of a bad session - Warning("Attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); - curl_easy_cleanup(curl); - janus_session = ""; - return -1; - } - - curl_easy_cleanup(curl); - Debug(1, "Queried for stream status: %s", response.c_str()); - if ((response.find("error") != std::string::npos) && ((response.find("No such session") != std::string::npos) || (response.find("No such handle") != std::string::npos))) { - Warning("Janus Session timed out"); - janus_session = ""; - return -2; - } else if (response.find("No such mountpoint") != std::string::npos) { - Warning("Mountpoint Missing"); - return 0; - } else { - return 1; - } -} - - -int Monitor::remove_from_janus() { - std::string response; - std::string endpoint; - if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { - endpoint = config.janus_path; - } else { - endpoint = "127.0.0.1:8088/janus/"; - } - std::string postData = "{\"janus\" : \"create\", \"transaction\" : \"randomString\"}"; - //std::size_t pos; - CURLcode res; - - curl = curl_easy_init(); - if(!curl) return -1; - - endpoint += "/"; - endpoint += janus_session; - - //Assemble our actual request - postData = "{\"janus\" : \"message\", \"transaction\" : \"randomString\", \"body\" : {"; - postData += "\"request\" : \"destroy\", \"admin_key\" : \""; - postData += config.janus_secret; - postData += "\", \"id\" : "; - postData += std::to_string(id); - postData += "}}"; - - curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); - res = curl_easy_perform(curl); - if (res != CURLE_OK) { - Warning("Libcurl attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); - curl_easy_cleanup(curl); - return -1; - } - - Debug(1, "Removed stream from Janus: %s", response.c_str()); - curl_easy_cleanup(curl); - return 0; -} - -int Monitor::get_janus_session() { - std::string response; - std::string endpoint; - if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { - endpoint = config.janus_path; - } else { - endpoint = "127.0.0.1:8088/janus/"; - } - std::string postData = "{\"janus\" : \"create\", \"transaction\" : \"randomString\"}"; - std::size_t pos; - CURLcode res; - curl = curl_easy_init(); - if(!curl) return -1; - - //Start Janus API init. Need to get a session_id and handle_id - curl_easy_setopt(curl, CURLOPT_URL, endpoint.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); - res = curl_easy_perform(curl); - if (res != CURLE_OK) { - Warning("Libcurl attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); - curl_easy_cleanup(curl); - return -1; - } - - pos = response.find("\"id\": "); - if (pos == std::string::npos) - { - curl_easy_cleanup(curl); - return -1; - } - janus_session = response.substr(pos + 6, 16); - - response = ""; - endpoint += "/"; - endpoint += janus_session; - postData = "{\"janus\" : \"attach\", \"plugin\" : \"janus.plugin.streaming\", \"transaction\" : \"randomString\"}"; - curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); - res = curl_easy_perform(curl); - if (res != CURLE_OK) - { - Warning("Libcurl attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); - curl_easy_cleanup(curl); - return -1; - } - - pos = response.find("\"id\": "); - if (pos == std::string::npos) - { - curl_easy_cleanup(curl); - return -1; - } - janus_session += "/"; - janus_session += response.substr(pos + 6, 16); - curl_easy_cleanup(curl); - return 1; -} //get_janus_session - -int Monitor::start_Amcrest() { - //init the transfer and start it in multi-handle - int running_handles; - long response_code; - struct CURLMsg *m; - CURLMcode curl_error; - if (Amcrest_handle != nullptr) { //potentially clean up the old handle - curl_multi_remove_handle(curl_multi, Amcrest_handle); - curl_easy_cleanup(Amcrest_handle); - } - - std::string full_url = onvif_url; - if (full_url.back() != '/') full_url += '/'; - full_url += "eventManager.cgi?action=attach&codes=[VideoMotion]"; - Amcrest_handle = curl_easy_init(); - if (!Amcrest_handle){ - Warning("Handle is null!"); - return -1; - } - curl_easy_setopt(Amcrest_handle, CURLOPT_URL, full_url.c_str()); - curl_easy_setopt(Amcrest_handle, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(Amcrest_handle, CURLOPT_WRITEDATA, &amcrest_response); - curl_easy_setopt(Amcrest_handle, CURLOPT_USERNAME, onvif_username.c_str()); - curl_easy_setopt(Amcrest_handle, CURLOPT_PASSWORD, onvif_password.c_str()); - curl_easy_setopt(Amcrest_handle, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST); - curl_error = curl_multi_add_handle(curl_multi, Amcrest_handle); - Warning("error of %i", curl_error); - curl_error = curl_multi_perform(curl_multi, &running_handles); - if (curl_error == CURLM_OK) { - curl_easy_getinfo(Amcrest_handle, CURLINFO_RESPONSE_CODE, &response_code); - int msgq = 0; - m = curl_multi_info_read(curl_multi, &msgq); - if (m && (m->msg == CURLMSG_DONE)) { - Warning("Libcurl exited Early: %i", m->data.result); - } - - curl_multi_wait(curl_multi, NULL, 0, 300, NULL); - curl_error = curl_multi_perform(curl_multi, &running_handles); - } - - if ((curl_error == CURLM_OK) && (running_handles > 0)) { - ONVIF_Healthy = TRUE; - } else { - Warning("Response: %s", amcrest_response.c_str()); - Warning("Seeing %i streams, and error of %i, url: %s", running_handles, curl_error, full_url.c_str()); - curl_easy_getinfo(Amcrest_handle, CURLINFO_OS_ERRNO, &response_code); - Warning("Response code: %lu", response_code); - } - -return 0; -} diff --git a/src/zm_monitor.h b/src/zm_monitor.h index 4ce0a9b08..30bc57d87 100644 --- a/src/zm_monitor.h +++ b/src/zm_monitor.h @@ -93,7 +93,7 @@ public: } Orientation; typedef enum { - DEINTERLACE_DISABLED = 0x00000000, + DEINTERLACE_DISABLED = 0x00000000, DEINTERLACE_FOUR_FIELD_SOFT = 0x00001E04, DEINTERLACE_FOUR_FIELD_MEDIUM = 0x00001404, DEINTERLACE_FOUR_FIELD_HARD = 0x00000A04, @@ -154,12 +154,12 @@ protected: uint32_t last_frame_score; /* +60 */ uint32_t audio_frequency; /* +64 */ uint32_t audio_channels; /* +68 */ - /* + /* ** This keeps 32bit time_t and 64bit time_t identical and compatible as long as time is before 2038. ** Shared memory layout should be identical for both 32bit and 64bit and is multiples of 16. - ** Because startup_time is 64bit it may be aligned to a 64bit boundary. So it's offset SHOULD be a multiple + ** Because startup_time is 64bit it may be aligned to a 64bit boundary. So it's offset SHOULD be a multiple ** of 8. Add or delete epadding's to achieve this. - */ + */ union { /* +72 */ time_t startup_time; /* When the zmc process started. zmwatch uses this to see how long the process has been running without getting any images */ uint64_t extrapad1; @@ -256,7 +256,49 @@ protected: bool hasAlarmed(); }; + class AmcrestAPI { protected: + Monitor *parent; + std::string amcrest_response; + CURLM *curl_multi = nullptr; + CURL *Amcrest_handle = nullptr; + static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp); + + public: + AmcrestAPI( Monitor *parent_); + ~AmcrestAPI(); + int API_Connect(); + void WaitForMessage(); + bool Amcrest_Alarmed; + int start_Amcrest(); + }; + + class JanusManager { + protected: + Monitor *parent; + CURL *curl = nullptr; + //helper class for CURL + static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp); + bool Janus_Healthy; + std::string janus_session; + std::string janus_handle; + std::string janus_endpoint; + std::string stream_key; + std::string rtsp_username; + std::string rtsp_password; + std::string rtsp_path; + + public: + JanusManager(Monitor *parent_); + ~JanusManager(); + int add_to_janus(); + int check_janus(); + int remove_from_janus(); + int get_janus_session(); + int get_janus_handle(); + int get_janus_plugin(); + std::string get_stream_key(); + }; // These are read from the DB and thereafter remain unchanged @@ -285,7 +327,6 @@ protected: std::string onvif_username; std::string onvif_password; std::string onvif_options; - std::string amcrest_response; bool onvif_event_listener; bool use_Amcrest_API; @@ -438,9 +479,12 @@ protected: std::string diag_path_delta; //ONVIF - bool ONVIF_Trigger_State; //Re-using some variables for Amcrest API support - bool ONVIF_Healthy; - bool ONVIF_Closes_Event; + bool Poll_Trigger_State; + bool Event_Poller_Healthy; + bool Event_Poller_Closes_Event; + + JanusManager *Janus_Manager; + AmcrestAPI *Amcrest_Manager; #ifdef WITH_GSOAP struct soap *soap = nullptr; @@ -452,17 +496,6 @@ protected: void set_credentials(struct soap *soap); #endif - //curl stuff - CURL *curl = nullptr; - CURLM *curl_multi = nullptr; - CURL *Amcrest_handle = nullptr; - int start_Amcrest(); - //helper class for CURL - static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp); - int add_to_janus(); - int remove_from_janus(); - int get_janus_session(); - std::string janus_session; // Used in check signal uint8_t red_val; @@ -530,11 +563,9 @@ public: return onvif_event_listener; } int check_janus(); //returns 1 for healthy, 0 for success but missing stream, negative for error. -#ifdef WITH_GSOAP - bool OnvifHealthy() { - return ONVIF_Healthy; + bool EventPollerHealthy() { + return Event_Poller_Healthy; } -#endif inline const char *EventPrefix() const { return event_prefix.c_str(); } inline bool Ready() const { if ( image_count >= ready_count ) { @@ -583,7 +614,7 @@ public: void SetVideoWriterStartTime(SystemTimePoint t) { video_store_data->recording = zm::chrono::duration_cast(t.time_since_epoch()); } - + unsigned int GetPreEventCount() const { return pre_event_count; }; int32_t GetImageBufferCount() const { return image_buffer_count; }; State GetState() const { return (State)shared_data->state; } diff --git a/src/zm_monitor_amcrest.cpp b/src/zm_monitor_amcrest.cpp new file mode 100644 index 000000000..04af8734c --- /dev/null +++ b/src/zm_monitor_amcrest.cpp @@ -0,0 +1,126 @@ +// +// ZoneMinder Monitor::AmcrestAPI Class Implementation, $Date$, $Revision$ +// Copyright (C) 2022 Jonathan Bennett +// +// This program is free software; you can redistribute it and/or +// modify it under the terms of the GNU General Public License +// as published by the Free Software Foundation; either version 2 +// of the License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +// + +#include "zm_monitor.h" + + +Monitor::AmcrestAPI::AmcrestAPI(Monitor *parent_) { + parent = parent_; + curl_multi = curl_multi_init(); + start_Amcrest(); +} + +Monitor::AmcrestAPI::~AmcrestAPI() { + if (Amcrest_handle != nullptr) { //potentially clean up the old handle + curl_multi_remove_handle(curl_multi, Amcrest_handle); + curl_easy_cleanup(Amcrest_handle); + } + if (curl_multi != nullptr) curl_multi_cleanup(curl_multi); +} + +int Monitor::AmcrestAPI::start_Amcrest() { + //init the transfer and start it in multi-handle + int running_handles; + long response_code; + struct CURLMsg *m; + CURLMcode curl_error; + if (Amcrest_handle != nullptr) { //potentially clean up the old handle + curl_multi_remove_handle(curl_multi, Amcrest_handle); + curl_easy_cleanup(Amcrest_handle); + } + + std::string full_url = parent->onvif_url; + if (full_url.back() != '/') full_url += '/'; + full_url += "eventManager.cgi?action=attach&codes=[VideoMotion]"; + Amcrest_handle = curl_easy_init(); + if (!Amcrest_handle){ + Warning("Handle is null!"); + return -1; + } + curl_easy_setopt(Amcrest_handle, CURLOPT_URL, full_url.c_str()); + curl_easy_setopt(Amcrest_handle, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(Amcrest_handle, CURLOPT_WRITEDATA, &amcrest_response); + curl_easy_setopt(Amcrest_handle, CURLOPT_USERNAME, parent->onvif_username.c_str()); + curl_easy_setopt(Amcrest_handle, CURLOPT_PASSWORD, parent->onvif_password.c_str()); + curl_easy_setopt(Amcrest_handle, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST); + curl_error = curl_multi_add_handle(curl_multi, Amcrest_handle); + if (curl_error != CURLM_OK) { + Warning("error of %i", curl_error); + } + curl_error = curl_multi_perform(curl_multi, &running_handles); + if (curl_error == CURLM_OK) { + curl_easy_getinfo(Amcrest_handle, CURLINFO_RESPONSE_CODE, &response_code); + int msgq = 0; + m = curl_multi_info_read(curl_multi, &msgq); + if (m && (m->msg == CURLMSG_DONE)) { + Warning("Libcurl exited Early: %i", m->data.result); + } + + curl_multi_wait(curl_multi, NULL, 0, 300, NULL); + curl_error = curl_multi_perform(curl_multi, &running_handles); + } + + if ((curl_error == CURLM_OK) && (running_handles > 0)) { + parent->Event_Poller_Healthy = TRUE; + } else { + Warning("Response: %s", amcrest_response.c_str()); + Warning("Seeing %i streams, and error of %i, url: %s", running_handles, curl_error, full_url.c_str()); + curl_easy_getinfo(Amcrest_handle, CURLINFO_OS_ERRNO, &response_code); + Warning("Response code: %lu", response_code); + } + +return 0; +} + +void Monitor::AmcrestAPI::WaitForMessage() { + int open_handles; + int transfers; + curl_multi_perform(curl_multi, &open_handles); + if (open_handles == 0) { + start_Amcrest(); //http transfer ended, need to restart. + } else { + curl_multi_wait(curl_multi, NULL, 0, 5000, &transfers); //wait for max 5 seconds for event. + if (transfers > 0) { //have data to deal with + curl_multi_perform(curl_multi, &open_handles); //actually grabs the data, populates amcrest_response + if (amcrest_response.find("action=Start") != std::string::npos) { + //Event Start + Debug(1,"Triggered on ONVIF"); + if (!parent->Poll_Trigger_State) { + Debug(1,"Triggered Event"); + parent->Poll_Trigger_State = TRUE; + } + } else if (amcrest_response.find("action=Stop") != std::string::npos){ + Debug(1, "Triggered off ONVIF"); + parent->Poll_Trigger_State = FALSE; + if (!parent->Event_Poller_Closes_Event) { //If we get a close event, then we know to expect them. + parent->Event_Poller_Closes_Event = TRUE; + Debug(1,"Setting ClosesEvent"); + } + } + amcrest_response.clear(); //We've dealt with the message, need to clear the queue + } + } + return; +} + +size_t Monitor::AmcrestAPI::WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) +{ + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; +} diff --git a/src/zm_monitor_janus.cpp b/src/zm_monitor_janus.cpp new file mode 100644 index 000000000..2fe9b3f26 --- /dev/null +++ b/src/zm_monitor_janus.cpp @@ -0,0 +1,316 @@ +// +// ZoneMinder Monitor::JanusManager Class Implementation, $Date$, $Revision$ +// Copyright (C) 2022 Jonathan Bennett +// +// This program is free software; you can redistribute it and/or +// modify it under the terms of the GNU General Public License +// as published by the Free Software Foundation; either version 2 +// of the License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +// + +#include "zm_monitor.h" + + +Monitor::JanusManager::JanusManager(Monitor *parent_) { //constructor takes care of init and calls add_to + std::string response; + std::size_t pos; + parent = parent_; + if ((config.janus_path != nullptr) && (config.janus_path[0] != '\0')) { + janus_endpoint = config.janus_path; //TODO: strip trailing / + } else { + janus_endpoint = "127.0.0.1:8088/janus"; + } + if (janus_endpoint.back() == '/') janus_endpoint.pop_back(); //remove the trailing slash if present + std::size_t pos2 = parent->path.find("@", pos); + if (pos2 != std::string::npos) { //If we find an @ symbol, we have a username/password. Otherwise, passwordless login. + + std::size_t pos = parent->path.find(":", 7); //Search for the colon, but only after the RTSP:// text. + if (pos == std::string::npos) throw std::runtime_error("Cannot Parse URL for Janus."); //Looks like an invalid url + rtsp_username = parent->path.substr(7, pos-7); + + rtsp_password = parent->path.substr(pos+1, pos2 - pos - 1); + rtsp_path = "RTSP://"; + rtsp_path += parent->path.substr(pos2 + 1); + + } else { + rtsp_username = ""; + rtsp_password = ""; + rtsp_path = parent->path; + } +} + +Monitor::JanusManager::~JanusManager() { + if (janus_session.empty()) get_janus_session(); + if (janus_handle.empty()) get_janus_handle(); + + std::string response; + std::string endpoint; + + std::string postData = "{\"janus\" : \"create\", \"transaction\" : \"randomString\"}"; + //std::size_t pos; + CURLcode res; + + curl = curl_easy_init(); + if(!curl) return; + + endpoint = janus_endpoint; + endpoint += "/"; + endpoint += janus_session; + endpoint += "/"; + endpoint += janus_handle; + + //Assemble our actual request + postData = "{\"janus\" : \"message\", \"transaction\" : \"randomString\", \"body\" : {"; + postData += "\"request\" : \"destroy\", \"admin_key\" : \""; + postData += config.janus_secret; + postData += "\", \"id\" : "; + postData += std::to_string(parent->id); + postData += "}}"; + + curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) { + Warning("Libcurl attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); + curl_easy_cleanup(curl); + return; + } + + Debug(1, "Removed stream from Janus: %s", response.c_str()); + curl_easy_cleanup(curl); + return; +} + + + +int Monitor::JanusManager::check_janus() { + if (janus_session.empty()) get_janus_session(); + if (janus_handle.empty()) get_janus_handle(); + + std::string response; + std::string endpoint = janus_endpoint; + std::string postData; + //std::size_t pos; + CURLcode res; + + curl = curl_easy_init(); + if(!curl) return -1; + + endpoint += "/"; + endpoint += janus_session; + endpoint += "/"; + endpoint += janus_handle; + + //Assemble our actual request + postData = "{\"janus\" : \"message\", \"transaction\" : \"randomString\", \"body\" : {"; + postData += "\"request\" : \"info\", \"id\" : "; + postData += std::to_string(parent->id); + postData += "}}"; + + curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) { //may mean an error code thrown by Janus, because of a bad session + Warning("Attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); + curl_easy_cleanup(curl); + janus_session = ""; + janus_handle = ""; + return -1; + } + + curl_easy_cleanup(curl); + Debug(1, "Queried for stream status: %s", response.c_str()); + if (response.find("\"janus\": \"error\"") != std::string::npos) { + if (response.find("No such session") != std::string::npos) { + Warning("Janus Session timed out"); + janus_session = ""; + return -2; + } else if (response.find("No such handle") != std::string::npos) { + Warning("Janus Handle timed out"); + janus_handle = ""; + return -2; + } + } else if (response.find("No such mountpoint") != std::string::npos) { + Warning("Mountpoint Missing"); + return 0; + } + return 1; +} + +int Monitor::JanusManager::add_to_janus() { + if (janus_session.empty()) get_janus_session(); + if (janus_handle.empty()) get_janus_handle(); + + std::string response; + std::string endpoint = janus_endpoint; + + CURLcode res; + + curl = curl_easy_init(); + if (!curl) { + Error("Failed to init curl"); + return -1; + } + + endpoint += "/"; + endpoint += janus_session; + endpoint += "/"; + endpoint += janus_handle; + + //Assemble our actual request + std::string postData = "{\"janus\" : \"message\", \"transaction\" : \"randomString\", \"body\" : {"; + postData += "\"request\" : \"create\", \"admin_key\" : \""; + postData += config.janus_secret; + postData += "\", \"type\" : \"rtsp\", "; + postData += "\"url\" : \""; + postData += rtsp_path; + if (rtsp_username != "") { + postData += "\", \"rtsp_user\" : \""; + postData += rtsp_username; + postData += "\", \"rtsp_pwd\" : \""; + postData += rtsp_password; + } + postData += "\", \"id\" : "; + postData += std::to_string(parent->id); + if (parent->janus_audio_enabled) postData += ", \"audio\" : true"; + postData += ", \"video\" : true}}"; + Warning("Sending %s to %s", postData.c_str(), endpoint.c_str()); + + curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) { + Error("Failed to curl_easy_perform adding rtsp stream"); + curl_easy_cleanup(curl); + return -1; + } + if (response.find("\"janus\": \"error\"") != std::string::npos) { + if (response.find("No such session") != std::string::npos) { + Warning("Janus Session timed out"); + janus_session = ""; + return -2; + } else if (response.find("No such handle") != std::string::npos) { + Warning("Janus Handle timed out"); + janus_handle = ""; + return -2; + } + } + //scan for missing session or handle id "No such session" "no such handle" + + Debug(1,"Added stream to Janus: %s", response.c_str()); + curl_easy_cleanup(curl); + return 0; +} + + +size_t Monitor::JanusManager::WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) +{ + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; +} + +/* +void Monitor::JanusManager::generateKey() +{ + const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + + std::random_device random_device; + std::mt19937 generator(random_device()); + std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1); + + std::string random_string; + + for (std::size_t i = 0; i < 16; ++i) + { + random_string += CHARACTERS[distribution(generator)]; + } + + stream_key = random_string; +} +*/ + + +int Monitor::JanusManager::get_janus_session() { + janus_session = ""; + std::string endpoint = janus_endpoint; + + std::string response; + + std::string postData = "{\"janus\" : \"create\", \"transaction\" : \"randomString\"}"; + std::size_t pos; + CURLcode res; + curl = curl_easy_init(); + if(!curl) return -1; + + curl_easy_setopt(curl, CURLOPT_URL, endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) { + Warning("Libcurl attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); + curl_easy_cleanup(curl); + return -1; + } + + pos = response.find("\"id\": "); + if (pos == std::string::npos) + { + curl_easy_cleanup(curl); + return -1; + } + janus_session = response.substr(pos + 6, 16); + curl_easy_cleanup(curl); + return 1; + +} //get_janus_session + +int Monitor::JanusManager::get_janus_handle() { + std::string response = ""; + std::string endpoint = janus_endpoint; + std::size_t pos; + + CURLcode res; + curl = curl_easy_init(); + if(!curl) return -1; + + endpoint += "/"; + endpoint += janus_session; + std::string postData = "{\"janus\" : \"attach\", \"plugin\" : \"janus.plugin.streaming\", \"transaction\" : \"randomString\"}"; + curl_easy_setopt(curl, CURLOPT_URL,endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData.c_str()); + res = curl_easy_perform(curl); + if (res != CURLE_OK) + { + Warning("Libcurl attempted %s got %s", endpoint.c_str(), curl_easy_strerror(res)); + curl_easy_cleanup(curl); + return -1; + } + + pos = response.find("\"id\": "); + if (pos == std::string::npos) + { + curl_easy_cleanup(curl); + return -1; + } + janus_handle = response.substr(pos + 6, 16); + curl_easy_cleanup(curl); + return 1; +} //get_janus_handle