From 5dbc69fb16c71c02859ec8665c55a1e2b068ddd7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 20 Jul 2015 22:33:39 +0200 Subject: o Adding an application to send samples over MQTT. o Improved CMake build script, better detection and error messages of headers/libraries. Conditionally adding the applications that can be compiled with the given set of found libraries. --- apps/CMakeLists.txt | 90 +++++++++++---- apps/apps.cpp | 10 +- apps/mqtt-publish.cpp | 275 +++++++++++++++++++++++++++++++++++++++++++++ sensor/CMakeLists.txt | 2 +- sensor/test/CMakeLists.txt | 2 +- test/CMakeLists.txt | 2 +- 6 files changed, 350 insertions(+), 31 deletions(-) create mode 100644 apps/mqtt-publish.cpp diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index b8255d1..411eb32 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -9,18 +9,41 @@ list(APPEND APPS sm-get-value) list(APPEND APPS sm-serial-read) list(APPEND APPS sm-serial-read-all) -add_library(apps OBJECT - apps.cpp apps.h - SoilMoisture.cpp SoilMoisture.h) -target_include_directories(apps - PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} - PUBLIC "${PROJECT_SOURCE_DIR}/include" - PUBLIC "${PROJECT_SOURCE_DIR}/json/src" - PUBLIC "${PROJECT_SOURCE_DIR}/sensor/include" - PUBLIC "${LOG4CPLUS_INCLUDE_DIRECTORIES}") +list(APPEND INCLUDE_DIRECTORIES + ${CMAKE_CURRENT_SOURCE_DIR} + "${PROJECT_SOURCE_DIR}/include" + "${PROJECT_SOURCE_DIR}/json/src" + "${PROJECT_SOURCE_DIR}/sensor/include") + +#### Find all packages + +function(find_header_and_lib PREFIX HEADER_NAME LIBRARY_NAME) +# message(STATUS "=== PREFIX ===") +# message(STATUS "PREFIX=${PREFIX}") +# message(STATUS "HEADER_NAME=${HEADER_NAME}") +# message(STATUS "LIBRARY_NAME=${LIBRARY_NAME}") + + find_path(${PREFIX}_INCLUDE_DIRECTORY ${HEADER_NAME}) + if(${${PREFIX}_INCLUDE_DIRECTORY} MATCHES NOTFOUND) + message(STATUS "Could not find header file: " ${HEADER_NAME}) + endif() + + find_library(${PREFIX}_LIBRARY ${LIBRARY_NAME}) + if(${${PREFIX}_LIBRARY} MATCHES NOTFOUND) + message(STATUS "Could not find library: " ${LIBRARY_NAME}) + endif() + + if(${${PREFIX}_INCLUDE_DIRECTORY} MATCHES NOTFOUND OR ${${PREFIX}_LIBRARY} MATCHES NOTFOUND) + set(${PREFIX}_OK NOTFOUND PARENT_SCOPE) + else() + set(${PREFIX}_INCLUDE_DIRECTORY ${${PREFIX}_INCLUDE_DIRECTORY} PARENT_SCOPE) + set(${PREFIX}_LIBRARY ${${PREFIX}_LIBRARY} PARENT_SCOPE) + set(${PREFIX}_OK OK PARENT_SCOPE) + endif() +endfunction() # Boost -find_package(Boost COMPONENTS regex system program_options REQUIRED) +find_package(Boost COMPONENTS regex system program_options REQUIRED QUIET) # Bluez pkg_check_modules(BLUEZ bluez REQUIRED) @@ -28,35 +51,56 @@ pkg_check_modules(BLUEZ bluez REQUIRED) # pthreads find_package(Threads REQUIRED) +# libpqxx pkg_check_modules(PQXX libpqxx REQUIRED) -find_path(LOG4CPLUS_INCLUDE_DIRECTORIES log4cplus/logger.h) -if(LOG4CPLUS_INCLUDE_DIRECTORIES MATCHES NOTFOUND) - message(FATAL_ERROR "Could not find log4cplus header files") +# log4cplus +find_header_and_lib(LOG4CPLUS log4cplus/logger.h log4cplus) + +if(NOT LOG4CPLUS_OK STREQUAL "OK") + message(STATUS "Not adding MQTT applications, missing header and/or library files") endif() -find_library(LOG4CPLUS_LIBRARIES log4cplus) -if(LOG4CPLUS_LIBRARIES MATCHES NOTFOUND) - message(FATAL_ERROR "Could not find log4cplus library files") +list(APPEND INCLUDE_DIRECTORIES "${LOG4CPLUS_INCLUDE_DIRECTORY}") +list(APPEND LIBRARIES "${LOG4CPLUS_LIBRARY}") + +# mosquitto + +find_header_and_lib(MOSQUITTO mosquitto.h mosquitto) +find_header_and_lib(MOSQUITTOPP mosquittopp.h mosquittopp) + +if(MOSQUITTO_OK STREQUAL "OK" AND MOSQUITTOPP_OK STREQUAL "OK") + list(APPEND APPS mqtt-publish) + list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTO_INCLUDE_DIRECTORY}") + list(APPEND INCLUDE_DIRECTORIES "${MOSQUITTOPP_INCLUDE_DIRECTORY}") + list(APPEND LIBRARIES "${MOSQUITTO_LIBRARY}") + list(APPEND LIBRARIES "${MOSQUITTOPP_LIBRARY}") +else() + message(STATUS "Not adding MQTT applications, missing header and/or library files") endif() +message(STATUS "Dependency summary:") +message(STATUS "Log4cplus: -I${LOG4CPLUS_INCLUDE_DIRECTORY}, -L${LOG4CPLUS_LIBRARY}") +message(STATUS "Mosquitto: -I${MOSQUITTO_INCLUDE_DIRECTORY}, -L${MOSQUITTO_LIBRARY}") +message(STATUS "Mosquittopp: -I${MOSQUITTOPP_INCLUDE_DIRECTORY}, -L${MOSQUITTOPP_LIBRARY}") + +add_library(apps OBJECT + apps.cpp apps.h + SoilMoisture.cpp SoilMoisture.h) +target_include_directories(apps PUBLIC ${INCLUDE_DIRECTORIES}) + foreach(app ${APPS}) add_executable(${app} ${app}.cpp $) - target_include_directories(${app} - PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} - PUBLIC "${PROJECT_SOURCE_DIR}/include" - PUBLIC "${PROJECT_SOURCE_DIR}/json/src" - PUBLIC "${PROJECT_SOURCE_DIR}/sensor/include" - PUBLIC "${LOG4CPLUS_INCLUDE_DIRECTORIES}") + target_include_directories(${app} PUBLIC ${INCLUDE_DIRECTORIES}) target_link_libraries(${app} ble trygvis-sensor + ${LIBRARIES} ${Boost_LIBRARIES} ${BLUEZ_LIBRARIES} ${PQXX_LIBRARIES} - ${LOG4CPLUS_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) install(TARGETS ${app} RUNTIME DESTINATION bin) diff --git a/apps/apps.cpp b/apps/apps.cpp index 5ff8bf1..b21d27e 100644 --- a/apps/apps.cpp +++ b/apps/apps.cpp @@ -3,10 +3,6 @@ #include #include #include -#include -#include -#include -#include namespace trygvis { namespace apps { @@ -105,7 +101,11 @@ int launch_app(app *app, int argc, const char *argv[]) { app_execution execution(all, vm, logger); - return app->main(execution); + int ret = app->main(execution); + + delete app; + + return ret; } catch (po::required_option &e) { cerr << "Missing required option: " << e.get_option_name() << endl; cerr << all << endl; diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp new file mode 100644 index 0000000..63ce998 --- /dev/null +++ b/apps/mqtt-publish.cpp @@ -0,0 +1,275 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "SoilMoisture.h" +#include "trygvis/sensor.h" +#include "trygvis/sensor/io.h" +#include "apps.h" + +namespace trygvis { +namespace apps { + +using namespace std; +using namespace std::chrono; +using namespace trygvis::apps; +using namespace trygvis::sensor; +using namespace trygvis::sensor::io; +using namespace mosqpp; + +class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp { +public: + MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name, + unsigned int keep_alive) + : SampleOutputStream(), + mosquittopp(client_id, clean_session), + host(host), + port(port), + topic_name(topic_name), + keep_alive(keep_alive), + connected(false), + should_reconnect(false) { + } + + ~MqttSampleOutputStream() { + close(); + }; + + void connect() { + should_reconnect = true; + int err; + if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) { + string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": "; + msg += mosqpp::strerror(err); + throw sample_exception(msg); + } + + err = loop_start(); + if (err) { + string msg = "Could not start network loop thread: "; + msg += mosqpp::strerror(err); + throw sample_exception(msg); + } + } + + void close() { + if (connected) { + LOG4CPLUS_INFO(logger, "Closing connection"); + + should_reconnect = false; + + int rc; + if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) { + LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc)); + } + } + + loop_stop(false); + } + + void on_connect(int rc) override { + LOG4CPLUS_INFO(logger, "Connected"); + if (rc == MOSQ_ERR_SUCCESS) { + connected = true; + } + } + + void on_disconnect(int rc) override { + if (!connected) { + return; + } + + if (should_reconnect) { + LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc)); + rc = reconnect_async(); + if (rc != MOSQ_ERR_SUCCESS) { + LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc)); + } + } else { + LOG4CPLUS_INFO(logger, "Disconnected"); + } + + connected = false; + } + + void on_log(int level, const char *str) override { + log4cplus::LogLevel l; + + if (level == MOSQ_LOG_INFO) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_NOTICE) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_WARNING) { + l = log4cplus::WARN_LOG_LEVEL; + } else if (level == MOSQ_LOG_ERR) { + l = log4cplus::FATAL_LOG_LEVEL; + } else if (level == MOSQ_LOG_DEBUG) { + l = log4cplus::DEBUG_LOG_LEVEL; + } else { + l = log4cplus::DEBUG_LOG_LEVEL; + } + + if ((logger).isEnabledFor(l)) { + log4cplus::tostringstream _log4cplus_buf; + _log4cplus_buf << "mosquitto: " << str; + (logger).forcedLog(l, _log4cplus_buf.str()); + } + } + + void on_publish(int message_id) override { + LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); + + cv.notify_all(); + } + + void wait() { + std::unique_lock lk(cv_mutex); + cv.wait(lk); + } + + void write(SampleRecord const &sample) override { + if (sample.empty()) { + return; + } + + // make a string of the sample + auto buf = make_shared(); + KeyValueSampleOutputStream out(buf, sample.dict); + out.write(sample); + string s = buf->str(); + + cout << "sample: " << s; + + // Publish the message + int message_id; + const char *message = s.c_str(); + + int err; + if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { + LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); + } else { + LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); + } + } + + const string host, topic_name; + const unsigned int port; + const unsigned int keep_alive; + + const int qos = 2; + const bool retain = true; + +private: + atomic_bool connected, should_reconnect; + Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); + condition_variable cv; + mutex cv_mutex; +}; + +class mosquitto_raii { +public: + mosquitto_raii() { + mosquitto_lib_init(); + } + + ~mosquitto_raii() { + mosquitto_lib_init(); + } +}; + +class mqtt_publish : public app { +public: + mqtt_publish() : app("mqtt-publish") {} + + ~mqtt_publish() = default; + + KeyDictionary dict; + sample_format_type format; + string client_id, host, input_file, topic_name; + unsigned int port, keep_alive; + bool clean_session; + + void add_options(po::options_description_easy_init &options) override { + auto format_opt = po::value(&format)->default_value(sample_format_type::KEY_VALUE); + auto input_file_opt = po::value(&input_file)->default_value("-"); + auto topic_opt = po::value(&topic_name)->required(); + auto keep_alive_opt = po::value(&keep_alive)->default_value(60); + + options("client-id", po::value(&client_id), "Client id"); + options("clean-session", po::value(&clean_session), "Instruct the broker to clean any existing session state"); + options("host", po::value(&host)->default_value("localhost"), "MQTT broker host name"); + options("port", po::value(&port)->default_value(1883), "MQTT broker port"); + options("format", format_opt, "Formatting of message format"); + options("input-file", input_file_opt, "Input file, '-' means stdin"); + options("topic", topic_opt, "The topic to publish to"); + options("keep-alive", keep_alive_opt, "How often the broker should send PING messages"); + } + + int main(app_execution &execution) override { + mosquitto_raii mosquitto_raii; + + auto desc = execution.desc; + auto vm = execution.vm; + + try { + istream *inputStream; + if (input_file == "-") { + inputStream = &cin; + } else { + inputStream = new ifstream(input_file); + if (inputStream->fail()) { + cerr << "Unable to open input file " << input_file << endl; + return EXIT_FAILURE; + } + } + + const char *client_id_ = nullptr; + + if (!vm["client-id"].empty()) { + client_id_ = client_id.c_str(); + } else { + clean_session = true; + } + + auto output = + make_shared(client_id_, clean_session, host, port, topic_name, keep_alive); + auto input = make_shared(output, dict); + + output->connect(); + + char data[100]; + while (!inputStream->eof()) { + inputStream->get(data[0]); + auto buf = boost::asio::buffer(data, 1); + input->process(buf); + } + + while (output->want_write()) { + output->wait(); + } + + output->close(); + + return EXIT_SUCCESS; + } catch (std::runtime_error ex) { + cout << "std::runtime_error: " << ex.what() << endl; + return EXIT_FAILURE; + } catch (std::exception ex) { + cout << "std::exception: " << ex.what() << endl; + return EXIT_FAILURE; + } + } +}; +} +} + +int main(int argc, const char *argv[]) { + using namespace trygvis::apps; + + return real_main(new mqtt_publish(), argc, argv); +} diff --git a/sensor/CMakeLists.txt b/sensor/CMakeLists.txt index c814687..d9f2156 100644 --- a/sensor/CMakeLists.txt +++ b/sensor/CMakeLists.txt @@ -10,7 +10,7 @@ target_include_directories(trygvis-sensor PUBLIC "${PROJECT_SOURCE_DIR}/json/src target_include_directories(trygvis-sensor PUBLIC include) # Boost -find_package(Boost COMPONENTS regex system REQUIRED) +find_package(Boost COMPONENTS regex system REQUIRED QUIET) add_subdirectory(test) diff --git a/sensor/test/CMakeLists.txt b/sensor/test/CMakeLists.txt index 5c2d527..53d833c 100644 --- a/sensor/test/CMakeLists.txt +++ b/sensor/test/CMakeLists.txt @@ -1,4 +1,4 @@ -find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED) +find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED QUIET) # If we can change directory here add_definition and test-specific stuff could be moved to the test directory file(GLOB TEST_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *Test.cpp) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d1feed8..4671329 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,4 +1,4 @@ -find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED) +find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED QUIET) # If we can change directory here add_definition and test-specific stuff could be moved to the test directory file(GLOB TEST_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *Test.cpp) -- cgit v1.2.3