Add MQTT_Enabled. parse subscriptions, sucribe to them. Add mqtt messages for starting/ending events

This commit is contained in:
Isaac Connor 2021-12-29 18:24:02 -05:00
parent 0f8684a9b9
commit 498cb85f64
2 changed files with 35 additions and 17 deletions

View File

@ -83,7 +83,8 @@ std::string load_monitor_sql =
"`SectionLength`, `MinSectionLength`, `FrameSkip`, `MotionFrameSkip`, "
"`FPSReportInterval`, `RefBlendPerc`, `AlarmRefBlendPerc`, `TrackMotion`, `Exif`,"
"`RTSPServer`, `RTSPStreamName`,"
"`SignalCheckPoints`, `SignalCheckColour`, `Importance`-1, `MQTT_Subscriptions` FROM `Monitors`";
"`SignalCheckPoints`, `SignalCheckColour`, `Importance`-1, "
"`MQTT_Enabled`, `MQTT_Subscriptions` FROM `Monitors`";
std::string CameraType_Strings[] = {
"Unknown",
@ -408,6 +409,9 @@ Monitor::Monitor()
dest_frame(nullptr),
convert_context(nullptr),
//zones(nullptr),
mqtt_enabled(false),
// mqtt_subscriptions,
mqtt(nullptr),
privacy_bitmask(nullptr),
n_linked_monitors(0),
linked_monitors(nullptr),
@ -417,20 +421,13 @@ Monitor::Monitor()
grayscale_val(0),
colour_val(0)
{
if ( strcmp(config.event_close_mode, "time") == 0 )
if (strcmp(config.event_close_mode, "time") == 0) {
event_close_mode = CLOSE_TIME;
else if ( strcmp(config.event_close_mode, "alarm") == 0 )
} else if (strcmp(config.event_close_mode, "alarm") == 0) {
event_close_mode = CLOSE_ALARM;
else
} else {
event_close_mode = CLOSE_IDLE;
event = nullptr;
last_section_mod = 0;
adaptive_skip = true;
videoStore = nullptr;
}
} // Monitor::Monitor
/*
@ -447,7 +444,7 @@ Monitor::Monitor()
"SectionLength, MinSectionLength, FrameSkip, MotionFrameSkip, "
"FPSReportInterval, RefBlendPerc, AlarmRefBlendPerc, TrackMotion, Exif,"
"`RTSPServer`,`RTSPStreamName`,
"SignalCheckPoints, SignalCheckColour, Importance-1, `MQTT_Subscriptions` FROM Monitors";
"SignalCheckPoints, SignalCheckColour, Importance-1, `MQTT_Enabled`, `MQTT_Subscriptions` FROM Monitors";
*/
void Monitor::Load(MYSQL_ROW dbrow, bool load_zones=true, Purpose p = QUERY) {
@ -631,7 +628,10 @@ void Monitor::Load(MYSQL_ROW dbrow, bool load_zones=true, Purpose p = QUERY) {
importance = dbrow[col] ? atoi(dbrow[col]) : 0; col++;
if (importance < 0) importance = 0; // Should only be >= 0
mqtt_subscriptions = Split(std::string(dbrow[col] ? dbrow[col] : ""), ','); col++;
mqtt_enabled = (*dbrow[col] != '0'); col++;
std::string mqtt_subscriptions_string = std::string(dbrow[col] ? dbrow[col] : "");
mqtt_subscriptions = Split(mqtt_subscriptions_string, ','); col++;
Error("MQTT enableed ? %d, subs %s", mqtt_enabled, mqtt_subscriptions_string.c_str());
// How many frames we need to have before we start analysing
ready_count = std::max(warmup_count, pre_event_count);
@ -1050,6 +1050,14 @@ bool Monitor::connect() {
video_store_data->size = sizeof(VideoStoreData);
usedsubpixorder = camera->SubpixelOrder(); // Used in CheckSignal
shared_data->valid = true;
if (mqtt_enabled) {
mqtt = zm::make_unique<MQTT>(this);
for (const std::string &subscription : mqtt_subscriptions) {
if (!subscription.empty())
mqtt->add_subscription(subscription);
}
}
} else if (!shared_data->valid) {
Error("Shared data not initialised by capture daemon for monitor %s", name.c_str());
return false;
@ -1115,6 +1123,9 @@ bool Monitor::disconnect() {
} // end bool Monitor::disconnect()
Monitor::~Monitor() {
if (mqtt) {
mqtt->send("offline");
}
Close();
if (mem_ptr != nullptr) {
@ -1694,6 +1705,9 @@ void Monitor::UpdateFPS() {
Info("%s: %d - Capturing at %.2lf fps, capturing bandwidth %ubytes/sec Analysing at %.2lf fps",
name.c_str(), image_count, new_capture_fps, new_capture_bandwidth, new_analysis_fps);
if (mqtt) mqtt->send(stringtf("Capturing at %.2lf fps, capturing bandwidth %ubytes/sec Analysing at %.2lf fps",
new_capture_fps, new_capture_bandwidth, new_analysis_fps));
shared_data->capture_fps = new_capture_fps;
last_fps_time = now;
last_capture_image_count = image_count;
@ -2705,6 +2719,8 @@ Event * Monitor::openEvent(
shared_data->last_event_id = event->Id();
strncpy(shared_data->alarm_cause, cause.c_str(), sizeof(shared_data->alarm_cause)-1);
if (mqtt) mqtt->send(stringtf("event start: %" PRId64, event->Id()));
if (!event_start_command.empty()) {
if (fork() == 0) {
execlp(event_start_command.c_str(), event_start_command.c_str(), std::to_string(event->Id()).c_str(), nullptr);
@ -2744,18 +2760,19 @@ void Monitor::closeEvent() {
} else {
Debug(1, "close event thread is not joinable");
}
if (mqtt) mqtt->send(stringtf("event end: %" PRId64, event->Id()));
Debug(1, "Starting thread to close event");
close_event_thread = std::thread([](Event *e, const std::string &command){
int64_t event_id = e->Id();
delete e;
if (!command.empty()) {
if (fork() == 0) {
execlp(command.c_str(), command.c_str(), std::to_string(event_id).c_str(), nullptr);
Error("Error execing %s", command.c_str());
}
}
}, event, event_end_command);
Debug(1, "Nulling event");
event = nullptr;

View File

@ -401,6 +401,7 @@ protected:
std::vector<Zone> zones;
bool mqtt_enabled;
std::vector<std::string> mqtt_subscriptions;
std::unique_ptr<MQTT> mqtt;