aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apps/CMakeLists.txt90
-rw-r--r--apps/apps.cpp10
-rw-r--r--apps/mqtt-publish.cpp275
-rw-r--r--sensor/CMakeLists.txt2
-rw-r--r--sensor/test/CMakeLists.txt2
-rw-r--r--test/CMakeLists.txt2
6 files changed, 350 insertions, 31 deletions
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index b8255d1..411eb32 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -9,18 +9,41 @@ list(APPEND APPS sm-get-value)
list(APPEND APPS sm-serial-read)
list(APPEND APPS sm-serial-read-all)
-add_library(apps OBJECT
- apps.cpp apps.h
- SoilMoisture.cpp SoilMoisture.h)
-target_include_directories(apps
- PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}
- PUBLIC "${PROJECT_SOURCE_DIR}/include"
- PUBLIC "${PROJECT_SOURCE_DIR}/json/src"
- PUBLIC "${PROJECT_SOURCE_DIR}/sensor/include"
- PUBLIC "${LOG4CPLUS_INCLUDE_DIRECTORIES}")
+list(APPEND INCLUDE_DIRECTORIES
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ "${PROJECT_SOURCE_DIR}/include"
+ "${PROJECT_SOURCE_DIR}/json/src"
+ "${PROJECT_SOURCE_DIR}/sensor/include")
+
+#### Find all packages
+
+function(find_header_and_lib PREFIX HEADER_NAME LIBRARY_NAME)
+# message(STATUS "=== PREFIX ===")
+# message(STATUS "PREFIX=${PREFIX}")
+# message(STATUS "HEADER_NAME=${HEADER_NAME}")
+# message(STATUS "LIBRARY_NAME=${LIBRARY_NAME}")
+
+ find_path(${PREFIX}_INCLUDE_DIRECTORY ${HEADER_NAME})
+ if(${${PREFIX}_INCLUDE_DIRECTORY} MATCHES NOTFOUND)
+ message(STATUS "Could not find header file: " ${HEADER_NAME})
+ endif()
+
+ find_library(${PREFIX}_LIBRARY ${LIBRARY_NAME})
+ if(${${PREFIX}_LIBRARY} MATCHES NOTFOUND)
+ message(STATUS "Could not find library: " ${LIBRARY_NAME})
+ endif()
+
+ if(${${PREFIX}_INCLUDE_DIRECTORY} MATCHES NOTFOUND OR ${${PREFIX}_LIBRARY} MATCHES NOTFOUND)
+ set(${PREFIX}_OK NOTFOUND PARENT_SCOPE)
+ else()
+ set(${PREFIX}_INCLUDE_DIRECTORY ${${PREFIX}_INCLUDE_DIRECTORY} PARENT_SCOPE)
+ set(${PREFIX}_LIBRARY ${${PREFIX}_LIBRARY} PARENT_SCOPE)
+ set(${PREFIX}_OK OK PARENT_SCOPE)
+ endif()
+endfunction()
# Boost
-find_package(Boost COMPONENTS regex system program_options REQUIRED)
+find_package(Boost COMPONENTS regex system program_options REQUIRED QUIET)
# Bluez
pkg_check_modules(BLUEZ bluez REQUIRED)
@@ -28,35 +51,56 @@ pkg_check_modules(BLUEZ bluez REQUIRED)
# pthreads
find_package(Threads REQUIRED)
+# libpqxx
pkg_check_modules(PQXX libpqxx REQUIRED)
-find_path(LOG4CPLUS_INCLUDE_DIRECTORIES log4cplus/logger.h)
-if(LOG4CPLUS_INCLUDE_DIRECTORIES MATCHES NOTFOUND)
- message(FATAL_ERROR "Could not find log4cplus header files")
+# log4cplus
+find_header_and_lib(LOG4CPLUS log4cplus/logger.h log4cplus)
+
+if(NOT LOG4CPLUS_OK STREQUAL "OK")
+ message(STATUS "Not adding MQTT applications, missing header and/or library files")
endif()
-find_library(LOG4CPLUS_LIBRARIES log4cplus)
-if(LOG4CPLUS_LIBRARIES MATCHES NOTFOUND)
- message(FATAL_ERROR "Could not find log4cplus library files")
+list(APPEND INCLUDE_DIRECTORIES "${LOG4CPLUS_INCLUDE_DIRECTORY}")
+list(APPEND LIBRARIES "${LOG4CPLUS_LIBRARY}")
+
+# mosquitto
+
+find_header_and_lib(MOSQUITTO mosquitto.h mosquitto)
+find_header_and_lib(MOSQUITTOPP mosquittopp.h mosquittopp)
+
+if(MOSQUITTO_OK STREQUAL "OK" AND MOSQUITTOPP_OK STREQUAL "OK")
+ list(APPEND APPS mqtt-publish)
+ list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTO_INCLUDE_DIRECTORY}")
+ list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTOPP_INCLUDE_DIRECTORY}")
+ list(APPEND LIBRARIES "${MOSQUITTO_LIBRARY}")
+ list(APPEND LIBRARIES "${MOSQUITTOPP_LIBRARY}")
+else()
+ message(STATUS "Not adding MQTT applications, missing header and/or library files")
endif()
+message(STATUS "Dependency summary:")
+message(STATUS "Log4cplus: -I${LOG4CPLUS_INCLUDE_DIRECTORY}, -L${LOG4CPLUS_LIBRARY}")
+message(STATUS "Mosquitto: -I${MOSQUITTO_INCLUDE_DIRECTORY}, -L${MOSQUITTO_LIBRARY}")
+message(STATUS "Mosquittopp: -I${MOSQUITTOPP_INCLUDE_DIRECTORY}, -L${MOSQUITTOPP_LIBRARY}")
+
+add_library(apps OBJECT
+ apps.cpp apps.h
+ SoilMoisture.cpp SoilMoisture.h)
+target_include_directories(apps PUBLIC ${INCLUDE_DIRECTORIES})
+
foreach(app ${APPS})
add_executable(${app} ${app}.cpp $<TARGET_OBJECTS:apps>)
- target_include_directories(${app}
- PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}
- PUBLIC "${PROJECT_SOURCE_DIR}/include"
- PUBLIC "${PROJECT_SOURCE_DIR}/json/src"
- PUBLIC "${PROJECT_SOURCE_DIR}/sensor/include"
- PUBLIC "${LOG4CPLUS_INCLUDE_DIRECTORIES}")
+ target_include_directories(${app} PUBLIC ${INCLUDE_DIRECTORIES})
target_link_libraries(${app}
ble
trygvis-sensor
+ ${LIBRARIES}
${Boost_LIBRARIES}
${BLUEZ_LIBRARIES}
${PQXX_LIBRARIES}
- ${LOG4CPLUS_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT})
install(TARGETS ${app} RUNTIME DESTINATION bin)
diff --git a/apps/apps.cpp b/apps/apps.cpp
index 5ff8bf1..b21d27e 100644
--- a/apps/apps.cpp
+++ b/apps/apps.cpp
@@ -3,10 +3,6 @@
#include <log4cplus/hierarchy.h>
#include <boost/program_options.hpp>
#include <netdb.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <unistd.h>
namespace trygvis {
namespace apps {
@@ -105,7 +101,11 @@ int launch_app(app *app, int argc, const char *argv[]) {
app_execution execution(all, vm, logger);
- return app->main(execution);
+ int ret = app->main(execution);
+
+ delete app;
+
+ return ret;
} catch (po::required_option &e) {
cerr << "Missing required option: " << e.get_option_name() << endl;
cerr << all << endl;
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
new file mode 100644
index 0000000..63ce998
--- /dev/null
+++ b/apps/mqtt-publish.cpp
@@ -0,0 +1,275 @@
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <fstream>
+#include <iomanip>
+#include <chrono>
+#include <boost/uuid/uuid_io.hpp>
+#include <thread>
+#include <mosquittopp.h>
+#include "SoilMoisture.h"
+#include "trygvis/sensor.h"
+#include "trygvis/sensor/io.h"
+#include "apps.h"
+
+namespace trygvis {
+namespace apps {
+
+using namespace std;
+using namespace std::chrono;
+using namespace trygvis::apps;
+using namespace trygvis::sensor;
+using namespace trygvis::sensor::io;
+using namespace mosqpp;
+
+class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp {
+public:
+ MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name,
+ unsigned int keep_alive)
+ : SampleOutputStream(),
+ mosquittopp(client_id, clean_session),
+ host(host),
+ port(port),
+ topic_name(topic_name),
+ keep_alive(keep_alive),
+ connected(false),
+ should_reconnect(false) {
+ }
+
+ ~MqttSampleOutputStream() {
+ close();
+ };
+
+ void connect() {
+ should_reconnect = true;
+ int err;
+ if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) {
+ string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": ";
+ msg += mosqpp::strerror(err);
+ throw sample_exception(msg);
+ }
+
+ err = loop_start();
+ if (err) {
+ string msg = "Could not start network loop thread: ";
+ msg += mosqpp::strerror(err);
+ throw sample_exception(msg);
+ }
+ }
+
+ void close() {
+ if (connected) {
+ LOG4CPLUS_INFO(logger, "Closing connection");
+
+ should_reconnect = false;
+
+ int rc;
+ if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) {
+ LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc));
+ }
+ }
+
+ loop_stop(false);
+ }
+
+ void on_connect(int rc) override {
+ LOG4CPLUS_INFO(logger, "Connected");
+ if (rc == MOSQ_ERR_SUCCESS) {
+ connected = true;
+ }
+ }
+
+ void on_disconnect(int rc) override {
+ if (!connected) {
+ return;
+ }
+
+ if (should_reconnect) {
+ LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc));
+ rc = reconnect_async();
+ if (rc != MOSQ_ERR_SUCCESS) {
+ LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc));
+ }
+ } else {
+ LOG4CPLUS_INFO(logger, "Disconnected");
+ }
+
+ connected = false;
+ }
+
+ void on_log(int level, const char *str) override {
+ log4cplus::LogLevel l;
+
+ if (level == MOSQ_LOG_INFO) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_NOTICE) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_WARNING) {
+ l = log4cplus::WARN_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_ERR) {
+ l = log4cplus::FATAL_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_DEBUG) {
+ l = log4cplus::DEBUG_LOG_LEVEL;
+ } else {
+ l = log4cplus::DEBUG_LOG_LEVEL;
+ }
+
+ if ((logger).isEnabledFor(l)) {
+ log4cplus::tostringstream _log4cplus_buf;
+ _log4cplus_buf << "mosquitto: " << str;
+ (logger).forcedLog(l, _log4cplus_buf.str());
+ }
+ }
+
+ void on_publish(int message_id) override {
+ LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
+
+ cv.notify_all();
+ }
+
+ void wait() {
+ std::unique_lock<std::mutex> lk(cv_mutex);
+ cv.wait(lk);
+ }
+
+ void write(SampleRecord const &sample) override {
+ if (sample.empty()) {
+ return;
+ }
+
+ // make a string of the sample
+ auto buf = make_shared<stringstream>();
+ KeyValueSampleOutputStream out(buf, sample.dict);
+ out.write(sample);
+ string s = buf->str();
+
+ cout << "sample: " << s;
+
+ // Publish the message
+ int message_id;
+ const char *message = s.c_str();
+
+ int err;
+ if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) {
+ LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err));
+ } else {
+ LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id);
+ }
+ }
+
+ const string host, topic_name;
+ const unsigned int port;
+ const unsigned int keep_alive;
+
+ const int qos = 2;
+ const bool retain = true;
+
+private:
+ atomic_bool connected, should_reconnect;
+ Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt"));
+ condition_variable cv;
+ mutex cv_mutex;
+};
+
+class mosquitto_raii {
+public:
+ mosquitto_raii() {
+ mosquitto_lib_init();
+ }
+
+ ~mosquitto_raii() {
+ mosquitto_lib_init();
+ }
+};
+
+class mqtt_publish : public app {
+public:
+ mqtt_publish() : app("mqtt-publish") {}
+
+ ~mqtt_publish() = default;
+
+ KeyDictionary dict;
+ sample_format_type format;
+ string client_id, host, input_file, topic_name;
+ unsigned int port, keep_alive;
+ bool clean_session;
+
+ void add_options(po::options_description_easy_init &options) override {
+ auto format_opt = po::value(&format)->default_value(sample_format_type::KEY_VALUE);
+ auto input_file_opt = po::value(&input_file)->default_value("-");
+ auto topic_opt = po::value(&topic_name)->required();
+ auto keep_alive_opt = po::value(&keep_alive)->default_value(60);
+
+ options("client-id", po::value(&client_id), "Client id");
+ options("clean-session", po::value(&clean_session), "Instruct the broker to clean any existing session state");
+ options("host", po::value(&host)->default_value("localhost"), "MQTT broker host name");
+ options("port", po::value(&port)->default_value(1883), "MQTT broker port");
+ options("format", format_opt, "Formatting of message format");
+ options("input-file", input_file_opt, "Input file, '-' means stdin");
+ options("topic", topic_opt, "The topic to publish to");
+ options("keep-alive", keep_alive_opt, "How often the broker should send PING messages");
+ }
+
+ int main(app_execution &execution) override {
+ mosquitto_raii mosquitto_raii;
+
+ auto desc = execution.desc;
+ auto vm = execution.vm;
+
+ try {
+ istream *inputStream;
+ if (input_file == "-") {
+ inputStream = &cin;
+ } else {
+ inputStream = new ifstream(input_file);
+ if (inputStream->fail()) {
+ cerr << "Unable to open input file " << input_file << endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ const char *client_id_ = nullptr;
+
+ if (!vm["client-id"].empty()) {
+ client_id_ = client_id.c_str();
+ } else {
+ clean_session = true;
+ }
+
+ auto output =
+ make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name, keep_alive);
+ auto input = make_shared<KeyValueSampleStreamParser>(output, dict);
+
+ output->connect();
+
+ char data[100];
+ while (!inputStream->eof()) {
+ inputStream->get(data[0]);
+ auto buf = boost::asio::buffer(data, 1);
+ input->process(buf);
+ }
+
+ while (output->want_write()) {
+ output->wait();
+ }
+
+ output->close();
+
+ return EXIT_SUCCESS;
+ } catch (std::runtime_error ex) {
+ cout << "std::runtime_error: " << ex.what() << endl;
+ return EXIT_FAILURE;
+ } catch (std::exception ex) {
+ cout << "std::exception: " << ex.what() << endl;
+ return EXIT_FAILURE;
+ }
+ }
+};
+}
+}
+
+int main(int argc, const char *argv[]) {
+ using namespace trygvis::apps;
+
+ return real_main(new mqtt_publish(), argc, argv);
+}
diff --git a/sensor/CMakeLists.txt b/sensor/CMakeLists.txt
index c814687..d9f2156 100644
--- a/sensor/CMakeLists.txt
+++ b/sensor/CMakeLists.txt
@@ -10,7 +10,7 @@ target_include_directories(trygvis-sensor PUBLIC "${PROJECT_SOURCE_DIR}/json/src
target_include_directories(trygvis-sensor PUBLIC include)
# Boost
-find_package(Boost COMPONENTS regex system REQUIRED)
+find_package(Boost COMPONENTS regex system REQUIRED QUIET)
add_subdirectory(test)
diff --git a/sensor/test/CMakeLists.txt b/sensor/test/CMakeLists.txt
index 5c2d527..53d833c 100644
--- a/sensor/test/CMakeLists.txt
+++ b/sensor/test/CMakeLists.txt
@@ -1,4 +1,4 @@
-find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED)
+find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED QUIET)
# If we can change directory here add_definition and test-specific stuff could be moved to the test directory
file(GLOB TEST_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *Test.cpp)
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index d1feed8..4671329 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1,4 +1,4 @@
-find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED)
+find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED QUIET)
# If we can change directory here add_definition and test-specific stuff could be moved to the test directory
file(GLOB TEST_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *Test.cpp)