aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2016-02-14 14:41:52 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2016-02-14 14:41:52 +0100
commit8ded9e3d0bbc2d7cdc5b9f01b4fed9c8685caf82 (patch)
treee155a951afe91f4ddf349ce70a71150943ed8cfd
parentf6493150c1a7172bcd8c9cc1790829285f707ee9 (diff)
downloadble-toys-8ded9e3d0bbc2d7cdc5b9f01b4fed9c8685caf82.tar.gz
ble-toys-8ded9e3d0bbc2d7cdc5b9f01b4fed9c8685caf82.tar.bz2
ble-toys-8ded9e3d0bbc2d7cdc5b9f01b4fed9c8685caf82.tar.xz
ble-toys-8ded9e3d0bbc2d7cdc5b9f01b4fed9c8685caf82.zip
mqtt: Using mqtt_support utilities from the mqtt-cassandra bridge.
-rw-r--r--.gitignore3
-rw-r--r--.gitmodules3
-rw-r--r--apps/CMakeLists.txt9
-rw-r--r--apps/apps.h4
-rw-r--r--apps/mqtt-publish.cpp183
-rw-r--r--apps/mqtt_support.cpp16
-rw-r--r--apps/mqtt_support.h419
-rw-r--r--apps/sm-db-insert.cpp3
-rw-r--r--apps/sm-db-select.cpp2
-rw-r--r--apps/sm-get-value.cpp4
-rw-r--r--ble/LinuxBluetooth.cpp2
m---------gsl0
m---------json0
-rw-r--r--sensor/main/io.cpp12
-rw-r--r--test/ByteBufferTest.cpp1
15 files changed, 491 insertions, 170 deletions
diff --git a/.gitignore b/.gitignore
index cdaf6a8..74c7978 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,3 +2,6 @@ noble
.idea
build
*.log
+*.tmp.*
+tmp.*
+*.tmp
diff --git a/.gitmodules b/.gitmodules
index c971d9f..a168a2f 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,6 @@
[submodule "json"]
path = json
url = https://github.com/nlohmann/json.git
+[submodule "gsl"]
+ path = gsl
+ url = https://github.com/Microsoft/GSL
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index 411eb32..34fbd30 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -13,6 +13,7 @@ list(APPEND INCLUDE_DIRECTORIES
${CMAKE_CURRENT_SOURCE_DIR}
"${PROJECT_SOURCE_DIR}/include"
"${PROJECT_SOURCE_DIR}/json/src"
+ "${PROJECT_SOURCE_DIR}/gsl/include"
"${PROJECT_SOURCE_DIR}/sensor/include")
#### Find all packages
@@ -67,14 +68,12 @@ list(APPEND LIBRARIES "${LOG4CPLUS_LIBRARY}")
# mosquitto
find_header_and_lib(MOSQUITTO mosquitto.h mosquitto)
-find_header_and_lib(MOSQUITTOPP mosquittopp.h mosquittopp)
-if(MOSQUITTO_OK STREQUAL "OK" AND MOSQUITTOPP_OK STREQUAL "OK")
+if(MOSQUITTO_OK STREQUAL "OK")
list(APPEND APPS mqtt-publish)
+ set(mqtt-publish_SOURCES mqtt_support.cpp mqtt_support.h)
list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTO_INCLUDE_DIRECTORY}")
- list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTOPP_INCLUDE_DIRECTORY}")
list(APPEND LIBRARIES "${MOSQUITTO_LIBRARY}")
- list(APPEND LIBRARIES "${MOSQUITTOPP_LIBRARY}")
else()
message(STATUS "Not adding MQTT applications, missing header and/or library files")
endif()
@@ -90,7 +89,7 @@ add_library(apps OBJECT
target_include_directories(apps PUBLIC ${INCLUDE_DIRECTORIES})
foreach(app ${APPS})
- add_executable(${app} ${app}.cpp $<TARGET_OBJECTS:apps>)
+ add_executable(${app} ${app}.cpp $<TARGET_OBJECTS:apps> ${${app}_SOURCES})
target_include_directories(${app} PUBLIC ${INCLUDE_DIRECTORIES})
diff --git a/apps/apps.h b/apps/apps.h
index d54110b..f3d6eae 100644
--- a/apps/apps.h
+++ b/apps/apps.h
@@ -6,11 +6,15 @@
#include <boost/program_options/variables_map.hpp>
#include <stdexcept>
#include <iosfwd>
+#include <experimental/optional>
#include <json.hpp>
namespace trygvis {
namespace apps {
+template<typename T>
+using o = std::experimental::optional<T>;
+
namespace po = boost::program_options;
using namespace log4cplus;
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
index 77b0c5a..25e8ae9 100644
--- a/apps/mqtt-publish.cpp
+++ b/apps/mqtt-publish.cpp
@@ -5,7 +5,8 @@
#include <iomanip>
#include <boost/uuid/uuid_io.hpp>
#include <thread>
-#include <mosquittopp.h>
+#include <log4cplus/loggingmacros.h>
+#include "mqtt_support.h"
#include "SoilMoisture.h"
#include "trygvis/sensor.h"
#include "trygvis/sensor/io.h"
@@ -19,124 +20,23 @@ using namespace std::chrono;
using namespace trygvis::apps;
using namespace trygvis::sensor;
using namespace trygvis::sensor::io;
-using namespace mosqpp;
+using namespace trygvis::mqtt_support;
-class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp {
+class MqttSampleOutputStream : public SampleOutputStream {
public:
- MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name,
+ MqttSampleOutputStream(const o<string> &client_id, bool clean_session, string host, unsigned int port,
+ string topic_name,
unsigned int keep_alive)
- : SampleOutputStream(),
- mosquittopp(client_id, clean_session),
- host(host),
- port(port),
- topic_name(topic_name),
- keep_alive(keep_alive),
- connected(false),
- should_reconnect(false),
- unacked_messages_(0){
+ : SampleOutputStream(),
+ client(host, port, keep_alive, client_id, clean_session),
+ topic_name(topic_name) {
+ client.connect();
}
~MqttSampleOutputStream() {
- close();
+ client.disconnect();
};
- void connect() {
- should_reconnect = true;
- int err;
- if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) {
- string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": ";
- msg += mosqpp::strerror(err);
- throw sample_exception(msg);
- }
-
- err = loop_start();
- if (err) {
- string msg = "Could not start network loop thread: ";
- msg += mosqpp::strerror(err);
- throw sample_exception(msg);
- }
- }
-
- void close() {
- if (connected) {
- LOG4CPLUS_INFO(logger, "Closing connection");
-
- should_reconnect = false;
-
- int rc;
- if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) {
- LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc));
- }
- }
-
- loop_stop(false);
- }
-
- void on_connect(int rc) override {
- LOG4CPLUS_INFO(logger, "Connected");
- if (rc == MOSQ_ERR_SUCCESS) {
- connected = true;
- }
- }
-
- void on_disconnect(int rc) override {
- if (!connected) {
- return;
- }
-
- if (should_reconnect) {
- LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc));
- rc = reconnect_async();
- if (rc != MOSQ_ERR_SUCCESS) {
- LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc));
- }
- } else {
- LOG4CPLUS_INFO(logger, "Disconnected");
- }
-
- connected = false;
- }
-
- void on_log(int level, const char *str) override {
- log4cplus::LogLevel l;
-
- if (level == MOSQ_LOG_INFO) {
- l = log4cplus::INFO_LOG_LEVEL;
- } else if (level == MOSQ_LOG_NOTICE) {
- l = log4cplus::INFO_LOG_LEVEL;
- } else if (level == MOSQ_LOG_WARNING) {
- l = log4cplus::WARN_LOG_LEVEL;
- } else if (level == MOSQ_LOG_ERR) {
- l = log4cplus::FATAL_LOG_LEVEL;
- } else if (level == MOSQ_LOG_DEBUG) {
- l = log4cplus::DEBUG_LOG_LEVEL;
- } else {
- l = log4cplus::DEBUG_LOG_LEVEL;
- }
-
- if ((logger).isEnabledFor(l)) {
- log4cplus::tostringstream _log4cplus_buf;
- _log4cplus_buf << "mosquitto: " << str;
- (logger).forcedLog(l, _log4cplus_buf.str());
- }
- }
-
- void on_publish(int message_id) override {
- LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
-
- cv.notify_all();
- unacked_messages_--;
- }
-
- void wait() {
- std::unique_lock<std::mutex> lk(cv_mutex);
- cv.wait(lk);
- }
-
- int unacked_messages() {
- return unacked_messages_;
- }
-
void write(SampleRecord const &sample) override {
if (sample.empty()) {
return;
@@ -154,44 +54,19 @@ public:
int message_id;
const char *message = s.c_str();
- int err;
- if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) {
- LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err));
- } else {
- unacked_messages_++;
- LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id);
- }
+ client.publish(&message_id, topic_name, qos, retain, static_cast<int>(s.length()), s.c_str());
}
- const string host, topic_name;
- const unsigned int port;
- const unsigned int keep_alive;
+ const string topic_name;
+ mqtt_client<mqtt_client_personality::threaded> client;
const int qos = 2;
const bool retain = true;
-
-private:
- atomic_bool connected, should_reconnect;
- Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt"));
- atomic_int unacked_messages_;
- condition_variable cv;
- mutex cv_mutex;
-};
-
-class mosquitto_raii {
-public:
- mosquitto_raii() {
- mosquitto_lib_init();
- }
-
- ~mosquitto_raii() {
- mosquitto_lib_init();
- }
};
class mqtt_publish : public app {
public:
- mqtt_publish() : app("mqtt-publish") {}
+ mqtt_publish() : app("mqtt-publish") { }
~mqtt_publish() = default;
@@ -218,7 +93,7 @@ public:
}
int main(app_execution &execution) override {
- mosquitto_raii mosquitto_raii;
+ mqtt_lib mqtt_lib;
auto desc = execution.desc;
auto vm = execution.vm;
@@ -235,41 +110,45 @@ public:
}
}
- const char *client_id_ = nullptr;
+ o <string> client_id_;
if (!vm["client-id"].empty()) {
- client_id_ = client_id.c_str();
+ client_id_ = client_id;
} else {
clean_session = true;
}
- auto output =
- make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name, keep_alive);
+ auto output = make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name,
+ keep_alive);
auto input = make_shared<KeyValueSampleStreamParser>(output, dict);
- output->connect();
+// while (!output->client.connected()) {
+// cout << "Waiting for connection" << endl;
+// output->client.wait();
+// }
char data[100];
while (!inputStream->eof()) {
inputStream->get(data[0]);
+ cout << "got data: " << inputStream->gcount() << endl;
auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount());
input->process(buf);
}
input->finish();
- while (output->unacked_messages()) {
- output->wait();
+ while (output->client.unacked_messages()) {
+ cout << "finishing.. unacked messages: " << output->client.unacked_messages() << endl;
+ output->client.wait();
}
- output->close();
-
return EXIT_SUCCESS;
- } catch (std::runtime_error ex) {
+ } catch (std::runtime_error &ex) {
cout << "std::runtime_error: " << ex.what() << endl;
return EXIT_FAILURE;
- } catch (std::exception ex) {
+ } catch (std::exception &ex) {
cout << "std::exception: " << ex.what() << endl;
+ cout << "typeid: " << typeid(ex).name() << endl;
return EXIT_FAILURE;
}
}
diff --git a/apps/mqtt_support.cpp b/apps/mqtt_support.cpp
new file mode 100644
index 0000000..cb8431d
--- /dev/null
+++ b/apps/mqtt_support.cpp
@@ -0,0 +1,16 @@
+#include "mqtt_support.h"
+
+namespace trygvis {
+namespace mqtt_support {
+
+using namespace std;
+
+int mqtt_lib::version_major;
+int mqtt_lib::version_minor;
+int mqtt_lib::version_revision;
+
+atomic_int mqtt_lib::mqtt_client_instance_count(0);
+mutex mqtt_lib::mqtt_client_mutex_;
+
+}
+}
diff --git a/apps/mqtt_support.h b/apps/mqtt_support.h
new file mode 100644
index 0000000..48a6c39
--- /dev/null
+++ b/apps/mqtt_support.h
@@ -0,0 +1,419 @@
+#ifndef TRYGVIS_MQTT_SUPPORT_H
+#define TRYGVIS_MQTT_SUPPORT_H
+
+#include <mutex>
+#include <string>
+#include <exception>
+#include <cstring>
+#include <span.h>
+#include <log4cplus/logger.h>
+#include <log4cplus/loggingmacros.h>
+#include <atomic>
+#include <condition_variable>
+#include <limits.h>
+#include <experimental/optional>
+#include "mosquitto.h"
+
+namespace trygvis {
+namespace mqtt_support {
+
+template<typename T>
+using o = std::experimental::optional<T>;
+
+using namespace std;
+using namespace log4cplus;
+using namespace gsl;
+
+static inline
+string error_to_string(int rc) {
+ if (rc == MOSQ_ERR_ERRNO) {
+ return string(strerror(errno));
+ }
+ return string(mosquitto_strerror(rc));
+}
+
+class mqtt_error : public std::runtime_error {
+
+public:
+ const int error;
+
+ mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) {
+ }
+};
+
+class mqtt_lib {
+public:
+ mqtt_lib() {
+ if (mqtt_client_instance_count++ == 0) {
+ lock_guard<mutex> l(mqtt_client_mutex_);
+ int rc = mosquitto_lib_init();
+
+ if (rc != MOSQ_ERR_SUCCESS) {
+ throw mqtt_error("Unable to initialize mosquitto: " + error_to_string(rc), rc);
+ }
+
+ mosquitto_lib_version(&version_major, &version_minor, &version_revision);
+ }
+ }
+
+ virtual ~mqtt_lib() {
+ if (--mqtt_client_instance_count == 0) {
+ lock_guard<mutex> l(mqtt_client_mutex_);
+
+ mosquitto_lib_cleanup();
+ }
+ }
+
+ static int version_major;
+ static int version_minor;
+ static int version_revision;
+
+private:
+ static atomic_int mqtt_client_instance_count;
+ static mutex mqtt_client_mutex_;
+};
+
+enum mqtt_client_personality {
+ threaded,
+ polling
+};
+
+template<mqtt_client_personality personality>
+class mqtt_client : private mqtt_lib {
+ template<bool>
+ struct personality_tag {
+ };
+
+ typedef personality_tag<mqtt_client_personality::threaded> threaded_tag;
+ typedef personality_tag<mqtt_client_personality::polling> polling_tag;
+ const personality_tag<personality> p_tag{};
+
+ struct mosquitto *mosquitto;
+
+ const string host;
+ const int port;
+ const int keep_alive;
+
+ recursive_mutex this_mutex;
+ using guard = lock_guard<recursive_mutex>;
+
+ bool connecting_, connected_;
+// bool should_reconnect_;
+
+ int unacked_messages_;
+ condition_variable cv;
+ mutex cv_mutex;
+
+ void assert_success(const string &function, int rc) {
+ if (rc != MOSQ_ERR_SUCCESS) {
+ throw mqtt_error(function + ": " + error_to_string(rc), rc);
+ }
+ }
+
+public:
+ mqtt_client(const string &host, const int port, const int keep_alive, const o<string> &client_id,
+ const bool clean_session) :
+ host(host), port(port), connecting_(false), connected_(false), /*should_reconnect_(false),*/
+ keep_alive(keep_alive), unacked_messages_(0) {
+ mosquitto = mosquitto_new(client_id ? (*client_id).c_str() : nullptr, clean_session, this);
+ if (!mosquitto) {
+ string err = strerror(errno);
+ throw runtime_error("Could not initialize mosquitto instance: " + err);
+ }
+ mosquitto_connect_callback_set(mosquitto, on_connect_cb);
+ mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
+ mosquitto_publish_callback_set(mosquitto, on_publish_cb);
+ mosquitto_message_callback_set(mosquitto, on_message_cb);
+ mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
+ mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
+ mosquitto_log_callback_set(mosquitto, on_log_cb);
+
+ post_construct(p_tag);
+ }
+
+private:
+ void post_construct(threaded_tag) {
+ LOG4CPLUS_INFO(logger, "mosquitto_loop_start");
+ int rc = mosquitto_loop_start(mosquitto);
+ assert_success("mosquitto_loop_start", rc);
+ }
+
+ void post_construct(polling_tag) {
+ }
+
+public:
+
+ virtual ~mqtt_client() {
+// should_reconnect_ = false;
+ pre_destruct(p_tag);
+
+ disconnect();
+ }
+
+private:
+ void pre_destruct(threaded_tag) {
+ int rc = mosquitto_loop_stop(mosquitto, true);
+ if (rc) {
+ LOG4CPLUS_WARN(logger, "mosquitto_loop_stop: " << error_to_string(rc));
+ }
+ }
+
+ void pre_destruct(polling_tag) {
+ }
+
+public:
+ void wait() {
+ unique_lock<mutex> lk(cv_mutex);
+ cv.wait(lk);
+ }
+
+ int unacked_messages() {
+ guard lock(this_mutex);
+ return unacked_messages_;
+ }
+
+ bool connected() {
+ guard lock(this_mutex);
+
+ return connected_;
+ }
+
+ bool connecting() {
+ guard lock(this_mutex);
+
+ return connecting_;
+ }
+
+ void connect() {
+ guard lock(this_mutex);
+
+ LOG4CPLUS_INFO(logger, "Connecting to " << host << ":" << port << ", keep_alive=" << keep_alive);
+
+ if (connecting_ || connected_) {
+ disconnect();
+ }
+
+ connect(p_tag);
+ }
+
+private:
+ void connect(threaded_tag) {
+ connecting_ = true;
+ connected_ = false;
+
+ LOG4CPLUS_DEBUG(logger, "mosquitto_connect_async");
+ int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive);
+ assert_success("mosquitto_connect_async", rc);
+ }
+
+ void connect(polling_tag) {
+ connecting_ = false;
+ connected_ = true;
+
+ LOG4CPLUS_DEBUG(logger, "mosquitto_connect");
+ int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive);
+ assert_success("mosquitto_connect", rc);
+ }
+
+private:
+ void on_connect_wrapper(int rc) {
+ guard lock(this_mutex);
+
+ connected_ = rc == MOSQ_ERR_SUCCESS;
+ connecting_ = false;
+
+ if (connected_) {
+ LOG4CPLUS_INFO(logger, "Connected");
+ } else {
+ LOG4CPLUS_INFO(logger, "Could not connect: " << error_to_string(rc));
+ }
+ on_connect(rc);
+
+ cv.notify_all();
+ }
+
+ void on_disconnect_wrapper(int rc) {
+ guard lock(this_mutex);
+
+ LOG4CPLUS_INFO(logger, "Disconnected");
+
+ bool was_connecting = connecting_, was_connected = connected_;
+ connecting_ = connected_ = false;
+ unacked_messages_ = 0;
+
+ on_disconnect(was_connecting, was_connected, rc);
+
+ cv.notify_all();
+
+// if (should_reconnect_) {
+// LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << error_to_string(rc));
+// this->connect();
+// if (rc != MOSQ_ERR_SUCCESS) {
+// LOG4CPLUS_WARN(logger, "Error when reconnecting: " << error_to_string(rc));
+// }
+// } else {
+// LOG4CPLUS_INFO(logger, "Disconnected");
+// }
+ }
+
+ void on_publish_wrapper(int message_id) {
+ guard lock(this_mutex);
+
+ LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
+ unacked_messages_--;
+
+ on_publish(message_id);
+
+ cv.notify_all();
+ }
+
+ void on_message_wrapper(const struct mosquitto_message *message) {
+ guard lock(this_mutex);
+ on_message(message);
+ }
+
+ void on_subscribe_wrapper(int mid, int qos_count, const int *granted_qos) {
+ guard lock(this_mutex);
+ on_subscribe(mid, mid, granted_qos);
+ }
+
+ void on_unsubscribe_wrapper(int mid) {
+ guard lock(this_mutex);
+ on_unsubscribe(mid);
+ }
+
+ void on_log_wrapper(int level, const char *str) {
+ guard lock(this_mutex);
+
+ log4cplus::LogLevel l;
+
+ if (level == MOSQ_LOG_INFO) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_NOTICE) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_WARNING) {
+ l = log4cplus::WARN_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_ERR) {
+ l = log4cplus::FATAL_LOG_LEVEL;
+ } else {
+ l = log4cplus::DEBUG_LOG_LEVEL;
+ }
+
+ if (logger.isEnabledFor(l)) {
+ log4cplus::tostringstream _log4cplus_buf;
+ _log4cplus_buf << "mosquitto: " << str;
+ logger.forcedLog(l, _log4cplus_buf.str());
+ }
+
+ on_log(level, str);
+ }
+
+public:
+ void disconnect() {
+ LOG4CPLUS_INFO(logger, "Disconnecting, connected: " << (connected() ? "yes" : "no"));
+ int rc = mosquitto_disconnect(mosquitto);
+ LOG4CPLUS_DEBUG(logger, "mosquitto_disconnect: " << error_to_string(rc));
+ }
+
+ void subscribe(int *mid, const string &topic, int qos) {
+ int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos);
+ assert_success("mosquitto_subscribe", rc);
+ }
+
+ void publish(int *mid, const string &topic, int qos, bool retain, int payload_len, const void *payload) {
+// if (!connected_) {
+// throw mqtt_error("not connected", MOSQ_ERR_NO_CONN);
+// }
+
+ LOG4CPLUS_DEBUG(logger, "Publishing " << payload_len << " bytes to " << topic);
+
+ int rc = mosquitto_publish(mosquitto, mid, topic.c_str(), payload_len, payload, qos, retain);
+
+ if(rc == MOSQ_ERR_SUCCESS) {
+ guard lock(this_mutex);
+ unacked_messages_++;
+ }
+
+ assert_success("mosquitto_publish", rc);
+ }
+
+// void set_should_reconnect(bool should_reconnect) {
+// this->should_reconnect_ = should_reconnect;
+// }
+
+public:
+ void poll() {
+ poll(p_tag);
+ }
+
+private:
+ void poll(threaded_tag) {
+ }
+
+ void poll(polling_tag) {
+ int rc = mosquitto_loop(mosquitto, 100, 1);
+ assert_success("mosquitto_loop", rc);
+ }
+
+ // -------------------------------------------
+ // Callbacks
+ // -------------------------------------------
+
+protected:
+ virtual void on_connect(int rc) {
+ }
+
+ virtual void on_disconnect(bool was_connecting, bool was_connected, int rc) {
+ }
+
+ virtual void on_publish(int mid) {
+ }
+
+ virtual void on_message(const struct mosquitto_message *message) {
+ }
+
+ virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) {
+ }
+
+ virtual void on_unsubscribe(int mid) {
+ }
+
+ virtual void on_log(int level, const char *str) {
+ }
+
+private:
+ static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_connect_wrapper(rc);
+ }
+
+ static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_disconnect_wrapper(rc);
+ }
+
+ static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_publish_wrapper(rc);
+ }
+
+ static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
+ static_cast<mqtt_client *>(self)->on_message_wrapper(message);
+ }
+
+ static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
+ static_cast<mqtt_client *>(self)->on_subscribe_wrapper(mid, qos_count, granted_qos);
+ }
+
+ static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
+ static_cast<mqtt_client *>(self)->on_unsubscribe_wrapper(mid);
+ }
+
+ static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
+ static_cast<mqtt_client *>(self)->on_log_wrapper(level, str);
+ }
+
+ Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt_client"));
+};
+
+}
+}
+
+#endif
diff --git a/apps/sm-db-insert.cpp b/apps/sm-db-insert.cpp
index d42807e..2b94c19 100644
--- a/apps/sm-db-insert.cpp
+++ b/apps/sm-db-insert.cpp
@@ -1,4 +1,3 @@
-#include <boost/optional.hpp>
#include <boost/lexical_cast.hpp>
#include <pqxx/connection.hxx>
#include <pqxx/transaction.hxx>
@@ -9,8 +8,6 @@
namespace trygvis {
namespace apps {
-template <class T>
-using o = boost::optional<T>;
using namespace std;
using json = nlohmann::json;
diff --git a/apps/sm-db-select.cpp b/apps/sm-db-select.cpp
index d9d7166..f66a284 100644
--- a/apps/sm-db-select.cpp
+++ b/apps/sm-db-select.cpp
@@ -8,8 +8,6 @@
namespace trygvis {
namespace apps {
-template <class T>
-using o = boost::optional<T>;
using namespace std;
using json = nlohmann::json;
diff --git a/apps/sm-get-value.cpp b/apps/sm-get-value.cpp
index 43729d8..0572155 100644
--- a/apps/sm-get-value.cpp
+++ b/apps/sm-get-value.cpp
@@ -3,6 +3,8 @@
#include <chrono>
#include <boost/uuid/uuid_io.hpp>
#include <thread>
+#include <log4cplus/logger.h>
+#include <log4cplus/loggingmacros.h>
#include "ble/Bluetooth.h"
#include "SoilMoisture.h"
#include "trygvis/sensor.h"
@@ -157,7 +159,7 @@ public:
}
}
- for_each(begin(sensorIndexes), end(sensorIndexes), [&](auto i) {
+ for_each(begin(sensorIndexes), end(sensorIndexes), [&](uint8_t i) {
if (i >= sensorCount) {
// Ignore invalid sensors
return;
diff --git a/ble/LinuxBluetooth.cpp b/ble/LinuxBluetooth.cpp
index b774644..587016c 100644
--- a/ble/LinuxBluetooth.cpp
+++ b/ble/LinuxBluetooth.cpp
@@ -123,7 +123,7 @@ Mac parseMac(bdaddr_t &a) {
// -----------------------------------------------------------------------
LinuxBluetoothDevice::LinuxBluetoothDevice(LinuxBluetoothAdapter &adapter, Mac &mac) :
- DefaultBluetoothDevice(adapter, mac) {
+ DefaultBluetoothDevice(adapter, mac), gatt(nullptr) {
}
LinuxBluetoothDevice::~LinuxBluetoothDevice() {
diff --git a/gsl b/gsl
new file mode 160000
+Subproject 6b82ac3d9c944727793469482849ee9f8f16f7b
diff --git a/json b/json
-Subproject c012b29ae5397af1608842ba915ad6b4133a030
+Subproject 2c720b26abcd8ef757291f2baf1311a4aab5812
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index 74bc796..96a98f6 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -215,7 +215,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
return;
}
- auto &s = *stream.get();
+ auto s = stream.get();
bool first = true;
if (!dict.empty()) {
@@ -228,9 +228,9 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
if (first) {
first = false;
} else {
- s << ", ";
+ *s << ", ";
}
- s << key->name << "=" << value.get();
+ *s << key->name << "=" << value.get();
}
}
} else {
@@ -241,16 +241,16 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
if (first) {
first = false;
} else {
- s << ", ";
+ *s << ", ";
}
// Make sure that the key is registered in the dictionary
dict.indexOf(sampleKey->name);
- s << sampleKey->name << "=" << o.get();
+ *s << sampleKey->name << "=" << o.get();
}
}
}
- s << endl << flush;
+ *s << endl << flush;
}
RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
diff --git a/test/ByteBufferTest.cpp b/test/ByteBufferTest.cpp
index 8659f64..6a1c4cb 100644
--- a/test/ByteBufferTest.cpp
+++ b/test/ByteBufferTest.cpp
@@ -1,5 +1,6 @@
#include "ble/ByteBuffer.h"
#include <iomanip>
+#include <iostream>
#define BOOST_TEST_MODULE "ByteBuffer"