diff --git a/src/ebusd/datahandler.cpp b/src/ebusd/datahandler.cpp index 324105dc..864ee459 100755 --- a/src/ebusd/datahandler.cpp +++ b/src/ebusd/datahandler.cpp @@ -75,8 +75,11 @@ bool datahandler_register(UserInfo* userInfo, BusHandler* busHandler, MessageMap return success; } -void DataSink::notifyUpdate(Message* message) { +void DataSink::notifyUpdate(Message* message, bool changed) { if (message && message->hasLevel(m_levels)) { + if (m_changedOnly && !changed) { + return; + } m_updatedMessages[message->getKey()]++; } } diff --git a/src/ebusd/datahandler.h b/src/ebusd/datahandler.h index 8b5a0dce..9e93096e 100755 --- a/src/ebusd/datahandler.h +++ b/src/ebusd/datahandler.h @@ -143,8 +143,9 @@ class DataSink : virtual public DataHandler { * Constructor. * @param userInfo the @a UserInfo instance. * @param user the user name for determining the allowed access levels (fall back to default levels). + * @param changedOnly whether to handle changed messages only in the updates. */ - DataSink(const UserInfo* userInfo, const string& user) { + DataSink(const UserInfo* userInfo, const string& user, bool changedOnly) : m_changedOnly(changedOnly) { m_levels = userInfo->getLevels(userInfo->hasUser(user) ? user : ""); } @@ -159,8 +160,9 @@ class DataSink : virtual public DataHandler { /** * Notify the sink of an updated @a Message (not necessarily changed though). * @param message the updated @a Message. + * @param changed whether the message data changed since the last notification. */ - virtual void notifyUpdate(Message* message); + virtual void notifyUpdate(Message* message, bool changed); /** * Notify the sink of the latest update check result. @@ -178,6 +180,9 @@ class DataSink : virtual public DataHandler { /** the allowed access levels. */ string m_levels; + /** whether to handle changed messages only in the updates. */ + bool m_changedOnly; + /** a map of updated @p Message keys. */ map m_updatedMessages; }; diff --git a/src/ebusd/knxhandler.cpp b/src/ebusd/knxhandler.cpp index da2690e1..2bca8694 100644 --- a/src/ebusd/knxhandler.cpp +++ b/src/ebusd/knxhandler.cpp @@ -163,7 +163,7 @@ bool knxhandler_register(UserInfo* userInfo, BusHandler* busHandler, MessageMap* } KnxHandler::KnxHandler(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages) - : DataSink(userInfo, "knx"), DataSource(busHandler), WaitThread(), m_messages(messages), + : DataSink(userInfo, "knx", true), DataSource(busHandler), WaitThread(), m_messages(messages), m_start(0), m_lastUpdateCheckResult("."), m_lastScanStatus(SCAN_STATUS_NONE), m_scanFinishReceived(false), m_lastErrorLogTime(0) { m_con = KnxConnection::create(g_url); @@ -709,7 +709,7 @@ void KnxHandler::handleGroupTelegram(knx_addr_t src, knx_addr_t dest, int len, c #define UPTIME_INTERVAL 3600 void KnxHandler::run() { - time_t lastTaskRun, now, lastSignal = 0, lastUptime = 0, lastUpdates = 0; + time_t lastTaskRun, now, lastSignal = 0, lastUptime = 0; bool signal = false; result_t result = RESULT_OK; time(&now); @@ -899,7 +899,6 @@ void KnxHandler::run() { if (!m_updatedMessages.empty()) { m_messages->lock(); if (m_con->isConnected()) { - time_t maxUpdates = 0; for (auto it = m_updatedMessages.begin(); it != m_updatedMessages.end(); ) { const vector* messages = m_messages->getByKey(it->first); if (!messages) { @@ -911,18 +910,10 @@ void KnxHandler::run() { if (changeTime <= 0) { continue; } - if (changeTime > lastUpdates && changeTime > maxUpdates) { - maxUpdates = changeTime; - } const auto mit = m_subscribedMessages.find(message->getKey()); if (mit == m_subscribedMessages.cend()) { continue; } - if (!(message->getDataHandlerState()&2)) { - message->setDataHandlerState(2, true); // first update still needed - } else if (changeTime <= lastUpdates) { - continue; - } for (auto destFlags : mit->second) { auto sit = m_subscribedGroups.find(destFlags); if (sit == m_subscribedGroups.end()) { @@ -941,7 +932,6 @@ void KnxHandler::run() { } it = m_updatedMessages.erase(it); } - lastUpdates = maxUpdates == 0 || lastUpdates > maxUpdates ? now : maxUpdates + 1; } else { m_updatedMessages.clear(); } diff --git a/src/ebusd/mainloop.cpp b/src/ebusd/mainloop.cpp index 32383250..66f2bd6e 100644 --- a/src/ebusd/mainloop.cpp +++ b/src/ebusd/mainloop.cpp @@ -352,8 +352,9 @@ void MainLoop::run() { m_messages->lock(); m_messages->findAll("", "", "*", false, true, true, true, true, true, sinkSince, now, false, &messages); for (const auto message : messages) { + bool changed = message->getLastChangeTime() >= sinkSince; for (const auto dataSink : dataSinks) { - dataSink->notifyUpdate(message); + dataSink->notifyUpdate(message, changed); } } m_messages->unlock(); diff --git a/src/ebusd/mqtthandler.cpp b/src/ebusd/mqtthandler.cpp index 6e1f17c1..b3204655 100755 --- a/src/ebusd/mqtthandler.cpp +++ b/src/ebusd/mqtthandler.cpp @@ -368,7 +368,8 @@ string removeTrailingNonTopicPart(const string& str) { } MqttHandler::MqttHandler(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages) - : DataSink(userInfo, "mqtt"), DataSource(busHandler), WaitThread(), m_messages(messages), m_connected(false), + : DataSink(userInfo, "mqtt", g_onlyChanges), DataSource(busHandler), WaitThread(), + m_messages(messages), m_connected(false), m_lastUpdateCheckResult("."), m_lastScanStatus(SCAN_STATUS_NONE) { m_definitionsSince = 0; m_client = nullptr; @@ -696,7 +697,7 @@ void splitFields(const string& str, vector* row) { } void MqttHandler::run() { - time_t lastTaskRun, now, start, lastSignal = 0, lastUpdates = 0; + time_t lastTaskRun, now, start, lastSignal = 0; bool signal = false; bool globalHasName = m_globalTopic.has("name"); string signalTopic = m_globalTopic.get("", "signal"); @@ -1068,27 +1069,21 @@ void MqttHandler::run() { if (!m_updatedMessages.empty()) { m_messages->lock(); if (m_connected) { - time_t maxUpdates = 0; for (auto it = m_updatedMessages.begin(); it != m_updatedMessages.end(); ) { const vector* messages = m_messages->getByKey(it->first); if (messages) { for (const auto& message : *messages) { time_t changeTime = message->getLastChangeTime(); - if (changeTime > 0 && message->isAvailable() - && (!g_onlyChanges || changeTime > lastUpdates)) { + if (changeTime > 0 && message->isAvailable()) { updates.str(""); updates.clear(); updates << dec; publishMessage(message, &updates); } - if (changeTime > lastUpdates && changeTime > maxUpdates) { - maxUpdates = changeTime; - } } } it = m_updatedMessages.erase(it); } - lastUpdates = maxUpdates == 0 || lastUpdates > maxUpdates ? now : maxUpdates + 1; } else { m_updatedMessages.clear(); }