diff options
Diffstat (limited to 'apps')
-rw-r--r-- | apps/CMakeLists.txt | 9 | ||||
-rw-r--r-- | apps/apps.h | 4 | ||||
-rw-r--r-- | apps/mqtt-publish.cpp | 183 | ||||
-rw-r--r-- | apps/mqtt_support.cpp | 16 | ||||
-rw-r--r-- | apps/mqtt_support.h | 419 | ||||
-rw-r--r-- | apps/sm-db-insert.cpp | 3 | ||||
-rw-r--r-- | apps/sm-db-select.cpp | 2 | ||||
-rw-r--r-- | apps/sm-get-value.cpp | 4 |
8 files changed, 477 insertions, 163 deletions
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index 411eb32..34fbd30 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -13,6 +13,7 @@ list(APPEND INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR} "${PROJECT_SOURCE_DIR}/include" "${PROJECT_SOURCE_DIR}/json/src" + "${PROJECT_SOURCE_DIR}/gsl/include" "${PROJECT_SOURCE_DIR}/sensor/include") #### Find all packages @@ -67,14 +68,12 @@ list(APPEND LIBRARIES "${LOG4CPLUS_LIBRARY}") # mosquitto find_header_and_lib(MOSQUITTO mosquitto.h mosquitto) -find_header_and_lib(MOSQUITTOPP mosquittopp.h mosquittopp) -if(MOSQUITTO_OK STREQUAL "OK" AND MOSQUITTOPP_OK STREQUAL "OK") +if(MOSQUITTO_OK STREQUAL "OK") list(APPEND APPS mqtt-publish) + set(mqtt-publish_SOURCES mqtt_support.cpp mqtt_support.h) list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTO_INCLUDE_DIRECTORY}") - list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTOPP_INCLUDE_DIRECTORY}") list(APPEND LIBRARIES "${MOSQUITTO_LIBRARY}") - list(APPEND LIBRARIES "${MOSQUITTOPP_LIBRARY}") else() message(STATUS "Not adding MQTT applications, missing header and/or library files") endif() @@ -90,7 +89,7 @@ add_library(apps OBJECT target_include_directories(apps PUBLIC ${INCLUDE_DIRECTORIES}) foreach(app ${APPS}) - add_executable(${app} ${app}.cpp $<TARGET_OBJECTS:apps>) + add_executable(${app} ${app}.cpp $<TARGET_OBJECTS:apps> ${${app}_SOURCES}) target_include_directories(${app} PUBLIC ${INCLUDE_DIRECTORIES}) diff --git a/apps/apps.h b/apps/apps.h index d54110b..f3d6eae 100644 --- a/apps/apps.h +++ b/apps/apps.h @@ -6,11 +6,15 @@ #include <boost/program_options/variables_map.hpp> #include <stdexcept> #include <iosfwd> +#include <experimental/optional> #include <json.hpp> namespace trygvis { namespace apps { +template<typename T> +using o = std::experimental::optional<T>; + namespace po = boost::program_options; using namespace log4cplus; diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp index 77b0c5a..25e8ae9 100644 --- a/apps/mqtt-publish.cpp +++ b/apps/mqtt-publish.cpp @@ -5,7 +5,8 @@ #include <iomanip> #include <boost/uuid/uuid_io.hpp> #include <thread> -#include <mosquittopp.h> +#include <log4cplus/loggingmacros.h> +#include "mqtt_support.h" #include "SoilMoisture.h" #include "trygvis/sensor.h" #include "trygvis/sensor/io.h" @@ -19,124 +20,23 @@ using namespace std::chrono; using namespace trygvis::apps; using namespace trygvis::sensor; using namespace trygvis::sensor::io; -using namespace mosqpp; +using namespace trygvis::mqtt_support; -class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp { +class MqttSampleOutputStream : public SampleOutputStream { public: - MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name, + MqttSampleOutputStream(const o<string> &client_id, bool clean_session, string host, unsigned int port, + string topic_name, unsigned int keep_alive) - : SampleOutputStream(), - mosquittopp(client_id, clean_session), - host(host), - port(port), - topic_name(topic_name), - keep_alive(keep_alive), - connected(false), - should_reconnect(false), - unacked_messages_(0){ + : SampleOutputStream(), + client(host, port, keep_alive, client_id, clean_session), + topic_name(topic_name) { + client.connect(); } ~MqttSampleOutputStream() { - close(); + client.disconnect(); }; - void connect() { - should_reconnect = true; - int err; - if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) { - string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": "; - msg += mosqpp::strerror(err); - throw sample_exception(msg); - } - - err = loop_start(); - if (err) { - string msg = "Could not start network loop thread: "; - msg += mosqpp::strerror(err); - throw sample_exception(msg); - } - } - - void close() { - if (connected) { - LOG4CPLUS_INFO(logger, "Closing connection"); - - should_reconnect = false; - - int rc; - if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) { - LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc)); - } - } - - loop_stop(false); - } - - void on_connect(int rc) override { - LOG4CPLUS_INFO(logger, "Connected"); - if (rc == MOSQ_ERR_SUCCESS) { - connected = true; - } - } - - void on_disconnect(int rc) override { - if (!connected) { - return; - } - - if (should_reconnect) { - LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc)); - rc = reconnect_async(); - if (rc != MOSQ_ERR_SUCCESS) { - LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc)); - } - } else { - LOG4CPLUS_INFO(logger, "Disconnected"); - } - - connected = false; - } - - void on_log(int level, const char *str) override { - log4cplus::LogLevel l; - - if (level == MOSQ_LOG_INFO) { - l = log4cplus::INFO_LOG_LEVEL; - } else if (level == MOSQ_LOG_NOTICE) { - l = log4cplus::INFO_LOG_LEVEL; - } else if (level == MOSQ_LOG_WARNING) { - l = log4cplus::WARN_LOG_LEVEL; - } else if (level == MOSQ_LOG_ERR) { - l = log4cplus::FATAL_LOG_LEVEL; - } else if (level == MOSQ_LOG_DEBUG) { - l = log4cplus::DEBUG_LOG_LEVEL; - } else { - l = log4cplus::DEBUG_LOG_LEVEL; - } - - if ((logger).isEnabledFor(l)) { - log4cplus::tostringstream _log4cplus_buf; - _log4cplus_buf << "mosquitto: " << str; - (logger).forcedLog(l, _log4cplus_buf.str()); - } - } - - void on_publish(int message_id) override { - LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); - - cv.notify_all(); - unacked_messages_--; - } - - void wait() { - std::unique_lock<std::mutex> lk(cv_mutex); - cv.wait(lk); - } - - int unacked_messages() { - return unacked_messages_; - } - void write(SampleRecord const &sample) override { if (sample.empty()) { return; @@ -154,44 +54,19 @@ public: int message_id; const char *message = s.c_str(); - int err; - if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { - LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); - } else { - unacked_messages_++; - LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); - } + client.publish(&message_id, topic_name, qos, retain, static_cast<int>(s.length()), s.c_str()); } - const string host, topic_name; - const unsigned int port; - const unsigned int keep_alive; + const string topic_name; + mqtt_client<mqtt_client_personality::threaded> client; const int qos = 2; const bool retain = true; - -private: - atomic_bool connected, should_reconnect; - Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); - atomic_int unacked_messages_; - condition_variable cv; - mutex cv_mutex; -}; - -class mosquitto_raii { -public: - mosquitto_raii() { - mosquitto_lib_init(); - } - - ~mosquitto_raii() { - mosquitto_lib_init(); - } }; class mqtt_publish : public app { public: - mqtt_publish() : app("mqtt-publish") {} + mqtt_publish() : app("mqtt-publish") { } ~mqtt_publish() = default; @@ -218,7 +93,7 @@ public: } int main(app_execution &execution) override { - mosquitto_raii mosquitto_raii; + mqtt_lib mqtt_lib; auto desc = execution.desc; auto vm = execution.vm; @@ -235,41 +110,45 @@ public: } } - const char *client_id_ = nullptr; + o <string> client_id_; if (!vm["client-id"].empty()) { - client_id_ = client_id.c_str(); + client_id_ = client_id; } else { clean_session = true; } - auto output = - make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name, keep_alive); + auto output = make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name, + keep_alive); auto input = make_shared<KeyValueSampleStreamParser>(output, dict); - output->connect(); +// while (!output->client.connected()) { +// cout << "Waiting for connection" << endl; +// output->client.wait(); +// } char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); + cout << "got data: " << inputStream->gcount() << endl; auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } input->finish(); - while (output->unacked_messages()) { - output->wait(); + while (output->client.unacked_messages()) { + cout << "finishing.. unacked messages: " << output->client.unacked_messages() << endl; + output->client.wait(); } - output->close(); - return EXIT_SUCCESS; - } catch (std::runtime_error ex) { + } catch (std::runtime_error &ex) { cout << "std::runtime_error: " << ex.what() << endl; return EXIT_FAILURE; - } catch (std::exception ex) { + } catch (std::exception &ex) { cout << "std::exception: " << ex.what() << endl; + cout << "typeid: " << typeid(ex).name() << endl; return EXIT_FAILURE; } } diff --git a/apps/mqtt_support.cpp b/apps/mqtt_support.cpp new file mode 100644 index 0000000..cb8431d --- /dev/null +++ b/apps/mqtt_support.cpp @@ -0,0 +1,16 @@ +#include "mqtt_support.h" + +namespace trygvis { +namespace mqtt_support { + +using namespace std; + +int mqtt_lib::version_major; +int mqtt_lib::version_minor; +int mqtt_lib::version_revision; + +atomic_int mqtt_lib::mqtt_client_instance_count(0); +mutex mqtt_lib::mqtt_client_mutex_; + +} +} diff --git a/apps/mqtt_support.h b/apps/mqtt_support.h new file mode 100644 index 0000000..48a6c39 --- /dev/null +++ b/apps/mqtt_support.h @@ -0,0 +1,419 @@ +#ifndef TRYGVIS_MQTT_SUPPORT_H +#define TRYGVIS_MQTT_SUPPORT_H + +#include <mutex> +#include <string> +#include <exception> +#include <cstring> +#include <span.h> +#include <log4cplus/logger.h> +#include <log4cplus/loggingmacros.h> +#include <atomic> +#include <condition_variable> +#include <limits.h> +#include <experimental/optional> +#include "mosquitto.h" + +namespace trygvis { +namespace mqtt_support { + +template<typename T> +using o = std::experimental::optional<T>; + +using namespace std; +using namespace log4cplus; +using namespace gsl; + +static inline +string error_to_string(int rc) { + if (rc == MOSQ_ERR_ERRNO) { + return string(strerror(errno)); + } + return string(mosquitto_strerror(rc)); +} + +class mqtt_error : public std::runtime_error { + +public: + const int error; + + mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) { + } +}; + +class mqtt_lib { +public: + mqtt_lib() { + if (mqtt_client_instance_count++ == 0) { + lock_guard<mutex> l(mqtt_client_mutex_); + int rc = mosquitto_lib_init(); + + if (rc != MOSQ_ERR_SUCCESS) { + throw mqtt_error("Unable to initialize mosquitto: " + error_to_string(rc), rc); + } + + mosquitto_lib_version(&version_major, &version_minor, &version_revision); + } + } + + virtual ~mqtt_lib() { + if (--mqtt_client_instance_count == 0) { + lock_guard<mutex> l(mqtt_client_mutex_); + + mosquitto_lib_cleanup(); + } + } + + static int version_major; + static int version_minor; + static int version_revision; + +private: + static atomic_int mqtt_client_instance_count; + static mutex mqtt_client_mutex_; +}; + +enum mqtt_client_personality { + threaded, + polling +}; + +template<mqtt_client_personality personality> +class mqtt_client : private mqtt_lib { + template<bool> + struct personality_tag { + }; + + typedef personality_tag<mqtt_client_personality::threaded> threaded_tag; + typedef personality_tag<mqtt_client_personality::polling> polling_tag; + const personality_tag<personality> p_tag{}; + + struct mosquitto *mosquitto; + + const string host; + const int port; + const int keep_alive; + + recursive_mutex this_mutex; + using guard = lock_guard<recursive_mutex>; + + bool connecting_, connected_; +// bool should_reconnect_; + + int unacked_messages_; + condition_variable cv; + mutex cv_mutex; + + void assert_success(const string &function, int rc) { + if (rc != MOSQ_ERR_SUCCESS) { + throw mqtt_error(function + ": " + error_to_string(rc), rc); + } + } + +public: + mqtt_client(const string &host, const int port, const int keep_alive, const o<string> &client_id, + const bool clean_session) : + host(host), port(port), connecting_(false), connected_(false), /*should_reconnect_(false),*/ + keep_alive(keep_alive), unacked_messages_(0) { + mosquitto = mosquitto_new(client_id ? (*client_id).c_str() : nullptr, clean_session, this); + if (!mosquitto) { + string err = strerror(errno); + throw runtime_error("Could not initialize mosquitto instance: " + err); + } + mosquitto_connect_callback_set(mosquitto, on_connect_cb); + mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); + mosquitto_publish_callback_set(mosquitto, on_publish_cb); + mosquitto_message_callback_set(mosquitto, on_message_cb); + mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); + mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); + mosquitto_log_callback_set(mosquitto, on_log_cb); + + post_construct(p_tag); + } + +private: + void post_construct(threaded_tag) { + LOG4CPLUS_INFO(logger, "mosquitto_loop_start"); + int rc = mosquitto_loop_start(mosquitto); + assert_success("mosquitto_loop_start", rc); + } + + void post_construct(polling_tag) { + } + +public: + + virtual ~mqtt_client() { +// should_reconnect_ = false; + pre_destruct(p_tag); + + disconnect(); + } + +private: + void pre_destruct(threaded_tag) { + int rc = mosquitto_loop_stop(mosquitto, true); + if (rc) { + LOG4CPLUS_WARN(logger, "mosquitto_loop_stop: " << error_to_string(rc)); + } + } + + void pre_destruct(polling_tag) { + } + +public: + void wait() { + unique_lock<mutex> lk(cv_mutex); + cv.wait(lk); + } + + int unacked_messages() { + guard lock(this_mutex); + return unacked_messages_; + } + + bool connected() { + guard lock(this_mutex); + + return connected_; + } + + bool connecting() { + guard lock(this_mutex); + + return connecting_; + } + + void connect() { + guard lock(this_mutex); + + LOG4CPLUS_INFO(logger, "Connecting to " << host << ":" << port << ", keep_alive=" << keep_alive); + + if (connecting_ || connected_) { + disconnect(); + } + + connect(p_tag); + } + +private: + void connect(threaded_tag) { + connecting_ = true; + connected_ = false; + + LOG4CPLUS_DEBUG(logger, "mosquitto_connect_async"); + int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive); + assert_success("mosquitto_connect_async", rc); + } + + void connect(polling_tag) { + connecting_ = false; + connected_ = true; + + LOG4CPLUS_DEBUG(logger, "mosquitto_connect"); + int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive); + assert_success("mosquitto_connect", rc); + } + +private: + void on_connect_wrapper(int rc) { + guard lock(this_mutex); + + connected_ = rc == MOSQ_ERR_SUCCESS; + connecting_ = false; + + if (connected_) { + LOG4CPLUS_INFO(logger, "Connected"); + } else { + LOG4CPLUS_INFO(logger, "Could not connect: " << error_to_string(rc)); + } + on_connect(rc); + + cv.notify_all(); + } + + void on_disconnect_wrapper(int rc) { + guard lock(this_mutex); + + LOG4CPLUS_INFO(logger, "Disconnected"); + + bool was_connecting = connecting_, was_connected = connected_; + connecting_ = connected_ = false; + unacked_messages_ = 0; + + on_disconnect(was_connecting, was_connected, rc); + + cv.notify_all(); + +// if (should_reconnect_) { +// LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << error_to_string(rc)); +// this->connect(); +// if (rc != MOSQ_ERR_SUCCESS) { +// LOG4CPLUS_WARN(logger, "Error when reconnecting: " << error_to_string(rc)); +// } +// } else { +// LOG4CPLUS_INFO(logger, "Disconnected"); +// } + } + + void on_publish_wrapper(int message_id) { + guard lock(this_mutex); + + LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); + unacked_messages_--; + + on_publish(message_id); + + cv.notify_all(); + } + + void on_message_wrapper(const struct mosquitto_message *message) { + guard lock(this_mutex); + on_message(message); + } + + void on_subscribe_wrapper(int mid, int qos_count, const int *granted_qos) { + guard lock(this_mutex); + on_subscribe(mid, mid, granted_qos); + } + + void on_unsubscribe_wrapper(int mid) { + guard lock(this_mutex); + on_unsubscribe(mid); + } + + void on_log_wrapper(int level, const char *str) { + guard lock(this_mutex); + + log4cplus::LogLevel l; + + if (level == MOSQ_LOG_INFO) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_NOTICE) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_WARNING) { + l = log4cplus::WARN_LOG_LEVEL; + } else if (level == MOSQ_LOG_ERR) { + l = log4cplus::FATAL_LOG_LEVEL; + } else { + l = log4cplus::DEBUG_LOG_LEVEL; + } + + if (logger.isEnabledFor(l)) { + log4cplus::tostringstream _log4cplus_buf; + _log4cplus_buf << "mosquitto: " << str; + logger.forcedLog(l, _log4cplus_buf.str()); + } + + on_log(level, str); + } + +public: + void disconnect() { + LOG4CPLUS_INFO(logger, "Disconnecting, connected: " << (connected() ? "yes" : "no")); + int rc = mosquitto_disconnect(mosquitto); + LOG4CPLUS_DEBUG(logger, "mosquitto_disconnect: " << error_to_string(rc)); + } + + void subscribe(int *mid, const string &topic, int qos) { + int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos); + assert_success("mosquitto_subscribe", rc); + } + + void publish(int *mid, const string &topic, int qos, bool retain, int payload_len, const void *payload) { +// if (!connected_) { +// throw mqtt_error("not connected", MOSQ_ERR_NO_CONN); +// } + + LOG4CPLUS_DEBUG(logger, "Publishing " << payload_len << " bytes to " << topic); + + int rc = mosquitto_publish(mosquitto, mid, topic.c_str(), payload_len, payload, qos, retain); + + if(rc == MOSQ_ERR_SUCCESS) { + guard lock(this_mutex); + unacked_messages_++; + } + + assert_success("mosquitto_publish", rc); + } + +// void set_should_reconnect(bool should_reconnect) { +// this->should_reconnect_ = should_reconnect; +// } + +public: + void poll() { + poll(p_tag); + } + +private: + void poll(threaded_tag) { + } + + void poll(polling_tag) { + int rc = mosquitto_loop(mosquitto, 100, 1); + assert_success("mosquitto_loop", rc); + } + + // ------------------------------------------- + // Callbacks + // ------------------------------------------- + +protected: + virtual void on_connect(int rc) { + } + + virtual void on_disconnect(bool was_connecting, bool was_connected, int rc) { + } + + virtual void on_publish(int mid) { + } + + virtual void on_message(const struct mosquitto_message *message) { + } + + virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) { + } + + virtual void on_unsubscribe(int mid) { + } + + virtual void on_log(int level, const char *str) { + } + +private: + static void on_connect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<mqtt_client *>(self)->on_connect_wrapper(rc); + } + + static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<mqtt_client *>(self)->on_disconnect_wrapper(rc); + } + + static void on_publish_cb(struct mosquitto *m, void *self, int rc) { + static_cast<mqtt_client *>(self)->on_publish_wrapper(rc); + } + + static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { + static_cast<mqtt_client *>(self)->on_message_wrapper(message); + } + + static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { + static_cast<mqtt_client *>(self)->on_subscribe_wrapper(mid, qos_count, granted_qos); + } + + static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { + static_cast<mqtt_client *>(self)->on_unsubscribe_wrapper(mid); + } + + static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { + static_cast<mqtt_client *>(self)->on_log_wrapper(level, str); + } + + Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt_client")); +}; + +} +} + +#endif diff --git a/apps/sm-db-insert.cpp b/apps/sm-db-insert.cpp index d42807e..2b94c19 100644 --- a/apps/sm-db-insert.cpp +++ b/apps/sm-db-insert.cpp @@ -1,4 +1,3 @@ -#include <boost/optional.hpp> #include <boost/lexical_cast.hpp> #include <pqxx/connection.hxx> #include <pqxx/transaction.hxx> @@ -9,8 +8,6 @@ namespace trygvis { namespace apps { -template <class T> -using o = boost::optional<T>; using namespace std; using json = nlohmann::json; diff --git a/apps/sm-db-select.cpp b/apps/sm-db-select.cpp index d9d7166..f66a284 100644 --- a/apps/sm-db-select.cpp +++ b/apps/sm-db-select.cpp @@ -8,8 +8,6 @@ namespace trygvis { namespace apps { -template <class T> -using o = boost::optional<T>; using namespace std; using json = nlohmann::json; diff --git a/apps/sm-get-value.cpp b/apps/sm-get-value.cpp index 43729d8..0572155 100644 --- a/apps/sm-get-value.cpp +++ b/apps/sm-get-value.cpp @@ -3,6 +3,8 @@ #include <chrono> #include <boost/uuid/uuid_io.hpp> #include <thread> +#include <log4cplus/logger.h> +#include <log4cplus/loggingmacros.h> #include "ble/Bluetooth.h" #include "SoilMoisture.h" #include "trygvis/sensor.h" @@ -157,7 +159,7 @@ public: } } - for_each(begin(sensorIndexes), end(sensorIndexes), [&](auto i) { + for_each(begin(sensorIndexes), end(sensorIndexes), [&](uint8_t i) { if (i >= sensorCount) { // Ignore invalid sensors return; |