diff --git a/lib/AmsData/include/AmsMqttHandler.h b/lib/AmsData/include/AmsMqttHandler.h deleted file mode 100644 index 08732819..00000000 --- a/lib/AmsData/include/AmsMqttHandler.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef _AMSMQTTHANDLER_H -#define _AMSMQTTHANDLER_H - -#include "Arduino.h" -#include -#include "AmsData.h" -#include "AmsConfiguration.h" -#include "EnergyAccounting.h" -#include "HwTools.h" -#include "EntsoeApi.h" - -#if defined(ESP32) -#include -#endif - -class AmsMqttHandler { -public: - AmsMqttHandler(MQTTClient* mqtt, char* buf) { - this->mqtt = mqtt; - this->json = buf; - }; - virtual ~AmsMqttHandler() {}; - - virtual bool publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi); - virtual bool publishTemperatures(AmsConfiguration*, HwTools*); - virtual bool publishPrices(EntsoeApi* eapi); - virtual bool publishSystem(HwTools*, EntsoeApi*, EnergyAccounting*); - -protected: - MQTTClient* mqtt; - char* json; - uint16_t BufferSize = 2048; - - bool loop(); -}; - -#endif diff --git a/lib/AmsMqttHandler/include/AmsMqttHandler.h b/lib/AmsMqttHandler/include/AmsMqttHandler.h new file mode 100644 index 00000000..684533bc --- /dev/null +++ b/lib/AmsMqttHandler/include/AmsMqttHandler.h @@ -0,0 +1,56 @@ +#ifndef _AMSMQTTHANDLER_H +#define _AMSMQTTHANDLER_H + +#include "Arduino.h" +#include +#include "AmsData.h" +#include "AmsConfiguration.h" +#include "EnergyAccounting.h" +#include "HwTools.h" +#include "EntsoeApi.h" + +#if defined(ESP32) +#include +#endif + +class AmsMqttHandler { +public: + AmsMqttHandler(MqttConfig& mqttConfig, RemoteDebug* debugger, char* buf) { + this->mqttConfig = mqttConfig; + this->debugger = debugger; + this->json = buf; + mqtt.dropOverflow(true); + }; + + bool connect(); + void disconnect(); + lwmqtt_err_t lastError(); + bool connected(); + bool loop(); + + virtual uint8_t getFormat() { return 0; }; + + virtual bool publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi) { return false; }; + virtual bool publishTemperatures(AmsConfiguration*, HwTools*) { return false; }; + virtual bool publishPrices(EntsoeApi* eapi) { return false; }; + virtual bool publishSystem(HwTools*, EntsoeApi*, EnergyAccounting*) { return false; }; + virtual bool publishRaw(String data) { return false; }; + + virtual ~AmsMqttHandler() { + if(mqttClient != NULL) { + mqttClient->stop(); + delete mqttClient; + } + }; + +protected: + RemoteDebug* debugger; + MqttConfig mqttConfig; + MQTTClient mqtt = MQTTClient(128); + WiFiClient *mqttClient = NULL; + WiFiClientSecure *mqttSecureClient = NULL; + char* json; + uint16_t BufferSize = 2048; +}; + +#endif diff --git a/lib/AmsMqttHandler/src/AmsMqttHandler.cpp b/lib/AmsMqttHandler/src/AmsMqttHandler.cpp new file mode 100644 index 00000000..9a1a36aa --- /dev/null +++ b/lib/AmsMqttHandler/src/AmsMqttHandler.cpp @@ -0,0 +1,152 @@ +#include "AmsMqttHandler.h" +#include "FirmwareVersion.h" +#include "AmsStorage.h" +#include "LittleFS.h" + +bool AmsMqttHandler::connect() { + time_t epoch = time(nullptr); + + if(mqttConfig.ssl) { + if(epoch < FirmwareVersion::BuildEpoch) { + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("NTP not ready for MQTT SSL")); + return false; + } + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("MQTT SSL is configured (%dkb free heap)"), ESP.getFreeHeap()); + if(mqttSecureClient == NULL) { + mqttSecureClient = new WiFiClientSecure(); + #if defined(ESP8266) + mqttSecureClient->setBufferSizes(512, 512); + debugD_P(PSTR("ESP8266 firmware does not have enough memory...")); + return; + #endif + + if(LittleFS.begin()) { + File file; + + if(LittleFS.exists(FILE_MQTT_CA)) { + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("Found MQTT CA file (%dkb free heap)"), ESP.getFreeHeap()); + file = LittleFS.open(FILE_MQTT_CA, (char*) "r"); + #if defined(ESP8266) + BearSSL::X509List *serverTrustedCA = new BearSSL::X509List(file); + mqttSecureClient->setTrustAnchors(serverTrustedCA); + #elif defined(ESP32) + if(mqttSecureClient->loadCACert(file, file.size())) { + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("CA accepted")); + } else { + if(debugger->isActive(RemoteDebug::WARNING)) debugger->printf_P(PSTR("CA was rejected")); + delete mqttSecureClient; + mqttSecureClient = NULL; + return false; + } + #endif + file.close(); + + if(LittleFS.exists(FILE_MQTT_CERT) && LittleFS.exists(FILE_MQTT_KEY)) { + #if defined(ESP8266) + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap()); + file = LittleFS.open(FILE_MQTT_CERT, (char*) "r"); + BearSSL::X509List *serverCertList = new BearSSL::X509List(file); + file.close(); + + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap()); + file = LittleFS.open(FILE_MQTT_KEY, (char*) "r"); + BearSSL::PrivateKey *serverPrivKey = new BearSSL::PrivateKey(file); + file.close(); + + debugD_P(PSTR("Setting client certificates (%dkb free heap)"), ESP.getFreeHeap()); + mqttSecureClient->setClientRSACert(serverCertList, serverPrivKey); + #elif defined(ESP32) + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap()); + file = LittleFS.open(FILE_MQTT_CERT, (char*) "r"); + mqttSecureClient->loadCertificate(file, file.size()); + file.close(); + + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap()); + file = LittleFS.open(FILE_MQTT_KEY, (char*) "r"); + mqttSecureClient->loadPrivateKey(file, file.size()); + file.close(); + #endif + } + } else { + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("No CA, disabling certificate validation")); + mqttSecureClient->setInsecure(); + } + mqttClient = mqttSecureClient; + + LittleFS.end(); + if(debugger->isActive(RemoteDebug::DEBUG)) debugger->printf_P(PSTR("MQTT SSL setup complete (%dkb free heap)"), ESP.getFreeHeap()); + } + } + } + + if(mqttClient == NULL) { + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("No SSL, using client without SSL support")); + mqttClient = new WiFiClient(); + } + + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("Connecting to MQTT %s:%d"), mqttConfig.host, mqttConfig.port); + + mqtt.begin(mqttConfig.host, mqttConfig.port, *mqttClient); + + #if defined(ESP8266) + if(mqttSecureClient) { + time_t epoch = time(nullptr); + debugD_P(PSTR("Setting NTP time %lu for secure MQTT connection"), epoch); + mqttSecureClient->setX509Time(epoch); + } + #endif + + // Connect to a unsecure or secure MQTT server + if ((strlen(mqttConfig.username) == 0 && mqtt.connect(mqttConfig.clientId)) || + (strlen(mqttConfig.username) > 0 && mqtt.connect(mqttConfig.clientId, mqttConfig.username, mqttConfig.password))) { + if(debugger->isActive(RemoteDebug::INFO)) debugger->printf_P(PSTR("Successfully connected to MQTT!")); + return true; + } else { + if (debugger->isActive(RemoteDebug::ERROR)) { + debugger->printf_P(PSTR("Failed to connect to MQTT: %d"), mqtt.lastError()); + #if defined(ESP8266) + if(mqttSecureClient) { + mqttSecureClient->getLastSSLError((char*) commonBuffer, BUF_SIZE_COMMON); + Debug.println((char*) commonBuffer); + } + #endif + } + return false; + } +} + +void AmsMqttHandler::disconnect() { + mqtt.disconnect(); + mqtt.loop(); + delay(10); + yield(); + + if(mqttClient != NULL) { + mqttClient->stop(); + delete mqttClient; + mqttClient = NULL; + if(mqttSecureClient != NULL) { + mqttSecureClient = NULL; + } + } +} + +lwmqtt_err_t AmsMqttHandler::lastError() { + return mqtt.lastError(); +} + +bool AmsMqttHandler::connected() { + return mqtt.connected(); +} + +bool AmsMqttHandler::loop() { + bool ret = mqtt.loop(); + delay(10); + yield(); + #if defined(ESP32) + esp_task_wdt_reset(); + #elif defined(ESP8266) + ESP.wdtFeed(); + #endif + return ret; +} \ No newline at end of file diff --git a/lib/DomoticzMqttHandler/include/DomoticzMqttHandler.h b/lib/DomoticzMqttHandler/include/DomoticzMqttHandler.h index e0630ff4..1e22f2c6 100644 --- a/lib/DomoticzMqttHandler/include/DomoticzMqttHandler.h +++ b/lib/DomoticzMqttHandler/include/DomoticzMqttHandler.h @@ -6,13 +6,16 @@ class DomoticzMqttHandler : public AmsMqttHandler { public: - DomoticzMqttHandler(MQTTClient* mqtt, char* buf, DomoticzConfig config) : AmsMqttHandler(mqtt, buf) { + DomoticzMqttHandler(MqttConfig& mqttConfig, RemoteDebug* debugger, char* buf, DomoticzConfig config) : AmsMqttHandler(mqttConfig, debugger, buf) { this->config = config; }; bool publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi); bool publishTemperatures(AmsConfiguration*, HwTools*); bool publishPrices(EntsoeApi*); bool publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea); + bool publishRaw(String data); + + uint8_t getFormat(); private: DomoticzConfig config; diff --git a/lib/DomoticzMqttHandler/src/DomoticzMqttHandler.cpp b/lib/DomoticzMqttHandler/src/DomoticzMqttHandler.cpp index e0147ef0..53ebb509 100644 --- a/lib/DomoticzMqttHandler/src/DomoticzMqttHandler.cpp +++ b/lib/DomoticzMqttHandler/src/DomoticzMqttHandler.cpp @@ -14,7 +14,7 @@ bool DomoticzMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyA config.elidx, val ); - ret = mqtt->publish(F("domoticz/in"), json); + ret = mqtt.publish(F("domoticz/in"), json); } } @@ -28,7 +28,7 @@ bool DomoticzMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyA config.vl1idx, val ); - ret |= mqtt->publish(F("domoticz/in"), json); + ret |= mqtt.publish(F("domoticz/in"), json); } if (config.vl2idx > 0){ @@ -38,7 +38,7 @@ bool DomoticzMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyA config.vl2idx, val ); - ret |= mqtt->publish(F("domoticz/in"), json); + ret |= mqtt.publish(F("domoticz/in"), json); } if (config.vl3idx > 0){ @@ -48,7 +48,7 @@ bool DomoticzMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyA config.vl3idx, val ); - ret |= mqtt->publish(F("domoticz/in"), json); + ret |= mqtt.publish(F("domoticz/in"), json); } if (config.cl1idx > 0){ @@ -58,7 +58,7 @@ bool DomoticzMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyA config.cl1idx, val ); - ret |= mqtt->publish(F("domoticz/in"), json); + ret |= mqtt.publish(F("domoticz/in"), json); } return ret; } @@ -74,3 +74,11 @@ bool DomoticzMqttHandler::publishPrices(EntsoeApi* eapi) { bool DomoticzMqttHandler::publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea) { return false; } + +uint8_t DomoticzMqttHandler::getFormat() { + return 3; +} + +bool DomoticzMqttHandler::publishRaw(String data) { + return false; +} diff --git a/lib/HomeAssistantMqttHandler/include/HomeAssistantMqttHandler.h b/lib/HomeAssistantMqttHandler/include/HomeAssistantMqttHandler.h index 3157b9e2..e51643d2 100644 --- a/lib/HomeAssistantMqttHandler/include/HomeAssistantMqttHandler.h +++ b/lib/HomeAssistantMqttHandler/include/HomeAssistantMqttHandler.h @@ -7,12 +7,12 @@ class HomeAssistantMqttHandler : public AmsMqttHandler { public: - HomeAssistantMqttHandler(MQTTClient* mqtt, char* buf, const char* clientId, const char* topic, uint8_t boardType, HomeAssistantConfig config, HwTools* hw) : AmsMqttHandler(mqtt, buf) { - this->clientId = clientId; - this->topic = String(topic); + HomeAssistantMqttHandler(MqttConfig& mqttConfig, RemoteDebug* debugger, char* buf, uint8_t boardType, HomeAssistantConfig config, HwTools* hw) : AmsMqttHandler(mqttConfig, debugger, buf) { this->hw = hw; l1Init = l2Init = l2eInit = l3Init = l3eInit = l4Init = l4eInit = rtInit = rteInit = pInit = sInit = false; + topic = String(mqttConfig.publishTopic); + if(strlen(config.discoveryNameTag) > 0) { snprintf_P(buf, 128, PSTR("AMS reader (%s)"), config.discoveryNameTag); deviceName = String(buf); @@ -56,11 +56,13 @@ public: bool publishTemperatures(AmsConfiguration*, HwTools*); bool publishPrices(EntsoeApi*); bool publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea); + bool publishRaw(String data); -protected: - bool loop(); + uint8_t getFormat(); private: + String topic; + String deviceName; String deviceModel; String deviceUid; @@ -74,8 +76,6 @@ private: bool tInit[32] = {false}; bool prInit[38] = {false}; - String clientId; - String topic; HwTools* hw; bool publishList1(AmsData* data, EnergyAccounting* ea); diff --git a/lib/HomeAssistantMqttHandler/src/HomeAssistantMqttHandler.cpp b/lib/HomeAssistantMqttHandler/src/HomeAssistantMqttHandler.cpp index 18897833..c3475f05 100644 --- a/lib/HomeAssistantMqttHandler/src/HomeAssistantMqttHandler.cpp +++ b/lib/HomeAssistantMqttHandler/src/HomeAssistantMqttHandler.cpp @@ -16,7 +16,7 @@ #endif bool HomeAssistantMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi) { - if(topic.isEmpty() || !mqtt->connected()) + if(topic.isEmpty() || !mqtt.connected()) return false; if(data->getListType() >= 3) { // publish energy counts @@ -45,7 +45,7 @@ bool HomeAssistantMqttHandler::publishList1(AmsData* data, EnergyAccounting* ea) snprintf_P(json, BufferSize, HA1_JSON, data->getActiveImportPower() ); - return mqtt->publish(topic + "/power", json); + return mqtt.publish(topic + "/power", json); } bool HomeAssistantMqttHandler::publishList2(AmsData* data, EnergyAccounting* ea) { @@ -66,7 +66,7 @@ bool HomeAssistantMqttHandler::publishList2(AmsData* data, EnergyAccounting* ea) data->getL2Voltage(), data->getL3Voltage() ); - return mqtt->publish(topic + "/power", json); + return mqtt.publish(topic + "/power", json); } bool HomeAssistantMqttHandler::publishList3(AmsData* data, EnergyAccounting* ea) { @@ -79,7 +79,7 @@ bool HomeAssistantMqttHandler::publishList3(AmsData* data, EnergyAccounting* ea) data->getReactiveExportCounter(), data->getMeterTimestamp() ); - return mqtt->publish(topic + "/energy", json); + return mqtt.publish(topic + "/energy", json); } bool HomeAssistantMqttHandler::publishList4(AmsData* data, EnergyAccounting* ea) { @@ -110,7 +110,7 @@ bool HomeAssistantMqttHandler::publishList4(AmsData* data, EnergyAccounting* ea) data->getPowerFactor() == 0 ? 1 : data->getL2PowerFactor(), data->getPowerFactor() == 0 ? 1 : data->getL3PowerFactor() ); - return mqtt->publish(topic + "/power", json); + return mqtt.publish(topic + "/power", json); } String HomeAssistantMqttHandler::getMeterModel(AmsData* data) { @@ -146,7 +146,7 @@ bool HomeAssistantMqttHandler::publishRealtime(AmsData* data, EnergyAccounting* ea->getProducedThisMonth(), ea->getIncomeThisMonth() ); - return mqtt->publish(topic + "/realtime", json); + return mqtt.publish(topic + "/realtime", json); } @@ -174,13 +174,13 @@ bool HomeAssistantMqttHandler::publishTemperatures(AmsConfiguration* config, HwT } char* pos = buf+strlen(buf); snprintf_P(count == 0 ? pos : pos-1, 8, PSTR("}}")); - bool ret = mqtt->publish(topic + "/temperatures", buf); + bool ret = mqtt.publish(topic + "/temperatures", buf); loop(); return ret; } bool HomeAssistantMqttHandler::publishPrices(EntsoeApi* eapi) { - if(topic.isEmpty() || !mqtt->connected()) + if(topic.isEmpty() || !mqtt.connected()) return false; if(eapi->getValueForHour(0) == ENTSOE_NO_VALUE) return false; @@ -310,13 +310,13 @@ bool HomeAssistantMqttHandler::publishPrices(EntsoeApi* eapi) { ts3hr, ts6hr ); - bool ret = mqtt->publish(topic + "/prices", json, true, 0); + bool ret = mqtt.publish(topic + "/prices", json, true, 0); loop(); return ret; } bool HomeAssistantMqttHandler::publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea) { - if(topic.isEmpty() || !mqtt->connected()) + if(topic.isEmpty() || !mqtt.connected()) return false; publishSystemSensors(); @@ -324,14 +324,14 @@ bool HomeAssistantMqttHandler::publishSystem(HwTools* hw, EntsoeApi* eapi, Energ snprintf_P(json, BufferSize, JSONSYS_JSON, WiFi.macAddress().c_str(), - clientId.c_str(), + mqttConfig.clientId, (uint32_t) (millis64()/1000), hw->getVcc(), hw->getWifiRssi(), hw->getTemperature(), FirmwareVersion::VersionString ); - bool ret = mqtt->publish(topic + "/state", json); + bool ret = mqtt.publish(topic + "/state", json); loop(); return ret; } @@ -345,7 +345,7 @@ void HomeAssistantMqttHandler::publishSensor(const HomeAssistantSensor& sensor) snprintf_P(json, BufferSize, HADISCOVER_JSON, sensorNamePrefix.c_str(), sensor.name, - topic.c_str(), sensor.topic, + mqttConfig.publishTopic, sensor.topic, deviceUid.c_str(), uid.c_str(), deviceUid.c_str(), uid.c_str(), sensor.uom, @@ -363,7 +363,7 @@ void HomeAssistantMqttHandler::publishSensor(const HomeAssistantSensor& sensor) strlen_P(sensor.stacl) > 0 ? (char *) FPSTR(sensor.stacl) : "", strlen_P(sensor.stacl) > 0 ? "\"" : "" ); - mqtt->publish(discoveryTopic + deviceUid + "_" + uid.c_str() + "/config", json, true, 0); + mqtt.publish(discoveryTopic + deviceUid + "_" + uid.c_str() + "/config", json, true, 0); loop(); } @@ -540,14 +540,10 @@ void HomeAssistantMqttHandler::publishSystemSensors() { sInit = true; } -bool HomeAssistantMqttHandler::loop() { - bool ret = mqtt->loop(); - delay(10); - yield(); - #if defined(ESP32) - esp_task_wdt_reset(); - #elif defined(ESP8266) - ESP.wdtFeed(); - #endif - return ret; +uint8_t HomeAssistantMqttHandler::getFormat() { + return 4; +} + +bool HomeAssistantMqttHandler::publishRaw(String data) { + return false; } diff --git a/lib/JsonMqttHandler/include/JsonMqttHandler.h b/lib/JsonMqttHandler/include/JsonMqttHandler.h index 8bccdc3b..124ad12f 100644 --- a/lib/JsonMqttHandler/include/JsonMqttHandler.h +++ b/lib/JsonMqttHandler/include/JsonMqttHandler.h @@ -5,22 +5,18 @@ class JsonMqttHandler : public AmsMqttHandler { public: - JsonMqttHandler(MQTTClient* mqtt, char* buf, const char* clientId, const char* topic, HwTools* hw) : AmsMqttHandler(mqtt, buf) { - this->clientId = clientId; - this->topic = String(topic); + JsonMqttHandler(MqttConfig& mqttConfig, RemoteDebug* debugger, char* buf, HwTools* hw) : AmsMqttHandler(mqttConfig, debugger, buf) { this->hw = hw; }; bool publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi); bool publishTemperatures(AmsConfiguration*, HwTools*); bool publishPrices(EntsoeApi*); bool publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea); + bool publishRaw(String data); -protected: - bool loop(); + uint8_t getFormat(); private: - String clientId; - String topic; HwTools* hw; bool publishList1(AmsData* data, EnergyAccounting* ea); diff --git a/lib/JsonMqttHandler/src/JsonMqttHandler.cpp b/lib/JsonMqttHandler/src/JsonMqttHandler.cpp index 3bb6bdf6..dcfcc2ba 100644 --- a/lib/JsonMqttHandler/src/JsonMqttHandler.cpp +++ b/lib/JsonMqttHandler/src/JsonMqttHandler.cpp @@ -10,7 +10,7 @@ #include "json/jsonprices_json.h" bool JsonMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi) { - if(topic.isEmpty() || !mqtt->connected()) + if(strlen(mqttConfig.publishTopic) == 0 || !mqtt.connected()) return false; bool ret = false; @@ -31,7 +31,7 @@ bool JsonMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyAccou bool JsonMqttHandler::publishList1(AmsData* data, EnergyAccounting* ea) { snprintf_P(json, BufferSize, JSON1_JSON, WiFi.macAddress().c_str(), - clientId.c_str(), + mqttConfig.clientId, (uint32_t) (millis64()/1000), data->getPackageTimestamp(), hw->getVcc(), @@ -45,13 +45,13 @@ bool JsonMqttHandler::publishList1(AmsData* data, EnergyAccounting* ea) { ea->getProducedThisHour(), ea->getProducedToday() ); - return mqtt->publish(topic, json); + return mqtt.publish(mqttConfig.publishTopic, json); } bool JsonMqttHandler::publishList2(AmsData* data, EnergyAccounting* ea) { snprintf_P(json, BufferSize, JSON2_JSON, WiFi.macAddress().c_str(), - clientId.c_str(), + mqttConfig.clientId, (uint32_t) (millis64()/1000), data->getPackageTimestamp(), hw->getVcc(), @@ -77,13 +77,13 @@ bool JsonMqttHandler::publishList2(AmsData* data, EnergyAccounting* ea) { ea->getProducedThisHour(), ea->getProducedToday() ); - return mqtt->publish(topic, json); + return mqtt.publish(mqttConfig.publishTopic, json); } bool JsonMqttHandler::publishList3(AmsData* data, EnergyAccounting* ea) { snprintf_P(json, BufferSize, JSON3_JSON, WiFi.macAddress().c_str(), - clientId.c_str(), + mqttConfig.clientId, (uint32_t) (millis64()/1000), data->getPackageTimestamp(), hw->getVcc(), @@ -114,13 +114,13 @@ bool JsonMqttHandler::publishList3(AmsData* data, EnergyAccounting* ea) { ea->getProducedThisHour(), ea->getProducedToday() ); - return mqtt->publish(topic, json); + return mqtt.publish(mqttConfig.publishTopic, json); } bool JsonMqttHandler::publishList4(AmsData* data, EnergyAccounting* ea) { snprintf_P(json, BufferSize, JSON4_JSON, WiFi.macAddress().c_str(), - clientId.c_str(), + mqttConfig.clientId, (uint32_t) (millis64()/1000), data->getPackageTimestamp(), hw->getVcc(), @@ -161,7 +161,7 @@ bool JsonMqttHandler::publishList4(AmsData* data, EnergyAccounting* ea) { ea->getProducedThisHour(), ea->getProducedToday() ); - return mqtt->publish(topic, json); + return mqtt.publish(mqttConfig.publishTopic, json); } String JsonMqttHandler::getMeterModel(AmsData* data) { @@ -191,13 +191,13 @@ bool JsonMqttHandler::publishTemperatures(AmsConfiguration* config, HwTools* hw) } char* pos = json+strlen(json); snprintf_P(count == 0 ? pos : pos-1, 8, PSTR("}}")); - bool ret = mqtt->publish(topic, json); + bool ret = mqtt.publish(mqttConfig.publishTopic, json); loop(); return ret; } bool JsonMqttHandler::publishPrices(EntsoeApi* eapi) { - if(topic.isEmpty() || !mqtt->connected()) + if(strlen(mqttConfig.publishTopic) == 0 || !mqtt.connected()) return false; if(eapi->getValueForHour(0) == ENTSOE_NO_VALUE) return false; @@ -325,37 +325,33 @@ bool JsonMqttHandler::publishPrices(EntsoeApi* eapi) { ts3hr, ts6hr ); - bool ret = mqtt->publish(topic, json); + bool ret = mqtt.publish(mqttConfig.publishTopic, json); loop(); return ret; } bool JsonMqttHandler::publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea) { - if(topic.isEmpty() || !mqtt->connected()) + if(strlen(mqttConfig.publishTopic) == 0 || !mqtt.connected()) return false; snprintf_P(json, BufferSize, JSONSYS_JSON, WiFi.macAddress().c_str(), - clientId.c_str(), + mqttConfig.clientId, (uint32_t) (millis64()/1000), hw->getVcc(), hw->getWifiRssi(), hw->getTemperature(), FirmwareVersion::VersionString ); - bool ret = mqtt->publish(topic, json); + bool ret = mqtt.publish(mqttConfig.publishTopic, json); loop(); return ret; } -bool JsonMqttHandler::loop() { - bool ret = mqtt->loop(); - delay(10); - yield(); - #if defined(ESP32) - esp_task_wdt_reset(); - #elif defined(ESP8266) - ESP.wdtFeed(); - #endif - return ret; +uint8_t JsonMqttHandler::getFormat() { + return 0; +} + +bool JsonMqttHandler::publishRaw(String data) { + return false; } diff --git a/lib/RawMqttHandler/include/RawMqttHandler.h b/lib/RawMqttHandler/include/RawMqttHandler.h index 5ab9b1bb..bf20394e 100644 --- a/lib/RawMqttHandler/include/RawMqttHandler.h +++ b/lib/RawMqttHandler/include/RawMqttHandler.h @@ -5,21 +5,21 @@ class RawMqttHandler : public AmsMqttHandler { public: - RawMqttHandler(MQTTClient* mqtt, char* buf, const char* topic, bool full) : AmsMqttHandler(mqtt, buf) { - this->topic = String(topic); - this->full = full; + RawMqttHandler(MqttConfig& mqttConfig, RemoteDebug* debugger, char* buf) : AmsMqttHandler(mqttConfig, debugger, buf) { + full = mqttConfig.payloadFormat == 2; + topic = String(mqttConfig.publishTopic); }; bool publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi); bool publishTemperatures(AmsConfiguration*, HwTools*); bool publishPrices(EntsoeApi*); bool publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea); + bool publishRaw(String data); -protected: - bool loop(); + uint8_t getFormat(); private: - String topic; bool full; + String topic; bool publishList1(AmsData* data, AmsData* meterState); bool publishList2(AmsData* data, AmsData* meterState); diff --git a/lib/RawMqttHandler/src/RawMqttHandler.cpp b/lib/RawMqttHandler/src/RawMqttHandler.cpp index 792c59ea..269ee774 100644 --- a/lib/RawMqttHandler/src/RawMqttHandler.cpp +++ b/lib/RawMqttHandler/src/RawMqttHandler.cpp @@ -3,11 +3,11 @@ #include "Uptime.h" bool RawMqttHandler::publish(AmsData* data, AmsData* meterState, EnergyAccounting* ea, EntsoeApi* eapi) { - if(topic.isEmpty() || !mqtt->connected()) + if(topic.isEmpty() || !mqtt.connected()) return false; if(data->getPackageTimestamp() > 0) { - mqtt->publish(topic + "/meter/dlms/timestamp", String(data->getPackageTimestamp())); + mqtt.publish(topic + "/meter/dlms/timestamp", String(data->getPackageTimestamp())); } switch(data->getListType()) { case 4: @@ -32,7 +32,7 @@ bool RawMqttHandler::publish(AmsData* data, AmsData* meterState, EnergyAccountin bool RawMqttHandler::publishList1(AmsData* data, AmsData* meterState) { if(full || meterState->getActiveImportPower() != data->getActiveImportPower()) { - mqtt->publish(topic + "/meter/import/active", String(data->getActiveImportPower())); + mqtt.publish(topic + "/meter/import/active", String(data->getActiveImportPower())); } return true; } @@ -40,101 +40,101 @@ bool RawMqttHandler::publishList1(AmsData* data, AmsData* meterState) { bool RawMqttHandler::publishList2(AmsData* data, AmsData* meterState) { // Only send data if changed. ID and Type is sent on the 10s interval only if changed if(full || meterState->getMeterId() != data->getMeterId()) { - mqtt->publish(topic + "/meter/id", data->getMeterId()); + mqtt.publish(topic + "/meter/id", data->getMeterId()); } if(full || meterState->getMeterModel() != data->getMeterModel()) { - mqtt->publish(topic + "/meter/type", data->getMeterModel()); + mqtt.publish(topic + "/meter/type", data->getMeterModel()); } if(full || meterState->getL1Current() != data->getL1Current()) { - mqtt->publish(topic + "/meter/l1/current", String(data->getL1Current(), 2)); + mqtt.publish(topic + "/meter/l1/current", String(data->getL1Current(), 2)); } if(full || meterState->getL1Voltage() != data->getL1Voltage()) { - mqtt->publish(topic + "/meter/l1/voltage", String(data->getL1Voltage(), 2)); + mqtt.publish(topic + "/meter/l1/voltage", String(data->getL1Voltage(), 2)); } if(full || meterState->getL2Current() != data->getL2Current()) { - mqtt->publish(topic + "/meter/l2/current", String(data->getL2Current(), 2)); + mqtt.publish(topic + "/meter/l2/current", String(data->getL2Current(), 2)); } if(full || meterState->getL2Voltage() != data->getL2Voltage()) { - mqtt->publish(topic + "/meter/l2/voltage", String(data->getL2Voltage(), 2)); + mqtt.publish(topic + "/meter/l2/voltage", String(data->getL2Voltage(), 2)); } if(full || meterState->getL3Current() != data->getL3Current()) { - mqtt->publish(topic + "/meter/l3/current", String(data->getL3Current(), 2)); + mqtt.publish(topic + "/meter/l3/current", String(data->getL3Current(), 2)); } if(full || meterState->getL3Voltage() != data->getL3Voltage()) { - mqtt->publish(topic + "/meter/l3/voltage", String(data->getL3Voltage(), 2)); + mqtt.publish(topic + "/meter/l3/voltage", String(data->getL3Voltage(), 2)); } if(full || meterState->getReactiveExportPower() != data->getReactiveExportPower()) { - mqtt->publish(topic + "/meter/export/reactive", String(data->getReactiveExportPower())); + mqtt.publish(topic + "/meter/export/reactive", String(data->getReactiveExportPower())); } if(full || meterState->getActiveExportPower() != data->getActiveExportPower()) { - mqtt->publish(topic + "/meter/export/active", String(data->getActiveExportPower())); + mqtt.publish(topic + "/meter/export/active", String(data->getActiveExportPower())); } if(full || meterState->getReactiveImportPower() != data->getReactiveImportPower()) { - mqtt->publish(topic + "/meter/import/reactive", String(data->getReactiveImportPower())); + mqtt.publish(topic + "/meter/import/reactive", String(data->getReactiveImportPower())); } return true; } bool RawMqttHandler::publishList3(AmsData* data, AmsData* meterState) { // ID and type belongs to List 2, but I see no need to send that every 10s - mqtt->publish(topic + "/meter/id", data->getMeterId(), true, 0); - mqtt->publish(topic + "/meter/type", data->getMeterModel(), true, 0); - mqtt->publish(topic + "/meter/clock", String(data->getMeterTimestamp())); - mqtt->publish(topic + "/meter/import/reactive/accumulated", String(data->getReactiveImportCounter(), 3), true, 0); - mqtt->publish(topic + "/meter/import/active/accumulated", String(data->getActiveImportCounter(), 3), true, 0); - mqtt->publish(topic + "/meter/export/reactive/accumulated", String(data->getReactiveExportCounter(), 3), true, 0); - mqtt->publish(topic + "/meter/export/active/accumulated", String(data->getActiveExportCounter(), 3), true, 0); + mqtt.publish(topic + "/meter/id", data->getMeterId(), true, 0); + mqtt.publish(topic + "/meter/type", data->getMeterModel(), true, 0); + mqtt.publish(topic + "/meter/clock", String(data->getMeterTimestamp())); + mqtt.publish(topic + "/meter/import/reactive/accumulated", String(data->getReactiveImportCounter(), 3), true, 0); + mqtt.publish(topic + "/meter/import/active/accumulated", String(data->getActiveImportCounter(), 3), true, 0); + mqtt.publish(topic + "/meter/export/reactive/accumulated", String(data->getReactiveExportCounter(), 3), true, 0); + mqtt.publish(topic + "/meter/export/active/accumulated", String(data->getActiveExportCounter(), 3), true, 0); return true; } bool RawMqttHandler::publishList4(AmsData* data, AmsData* meterState) { if(full || meterState->getL1ActiveImportPower() != data->getL1ActiveImportPower()) { - mqtt->publish(topic + "/meter/import/l1", String(data->getL1ActiveImportPower(), 2)); + mqtt.publish(topic + "/meter/import/l1", String(data->getL1ActiveImportPower(), 2)); } if(full || meterState->getL2ActiveImportPower() != data->getL2ActiveImportPower()) { - mqtt->publish(topic + "/meter/import/l2", String(data->getL2ActiveImportPower(), 2)); + mqtt.publish(topic + "/meter/import/l2", String(data->getL2ActiveImportPower(), 2)); } if(full || meterState->getL3ActiveImportPower() != data->getL3ActiveImportPower()) { - mqtt->publish(topic + "/meter/import/l3", String(data->getL3ActiveImportPower(), 2)); + mqtt.publish(topic + "/meter/import/l3", String(data->getL3ActiveImportPower(), 2)); } if(full || meterState->getL1ActiveExportPower() != data->getL1ActiveExportPower()) { - mqtt->publish(topic + "/meter/export/l1", String(data->getL1ActiveExportPower(), 2)); + mqtt.publish(topic + "/meter/export/l1", String(data->getL1ActiveExportPower(), 2)); } if(full || meterState->getL2ActiveExportPower() != data->getL2ActiveExportPower()) { - mqtt->publish(topic + "/meter/export/l2", String(data->getL2ActiveExportPower(), 2)); + mqtt.publish(topic + "/meter/export/l2", String(data->getL2ActiveExportPower(), 2)); } if(full || meterState->getL3ActiveExportPower() != data->getL3ActiveExportPower()) { - mqtt->publish(topic + "/meter/export/l3", String(data->getL3ActiveExportPower(), 2)); + mqtt.publish(topic + "/meter/export/l3", String(data->getL3ActiveExportPower(), 2)); } if(full || meterState->getPowerFactor() != data->getPowerFactor()) { - mqtt->publish(topic + "/meter/powerfactor", String(data->getPowerFactor(), 2)); + mqtt.publish(topic + "/meter/powerfactor", String(data->getPowerFactor(), 2)); } if(full || meterState->getL1PowerFactor() != data->getL1PowerFactor()) { - mqtt->publish(topic + "/meter/l1/powerfactor", String(data->getL1PowerFactor(), 2)); + mqtt.publish(topic + "/meter/l1/powerfactor", String(data->getL1PowerFactor(), 2)); } if(full || meterState->getL2PowerFactor() != data->getL2PowerFactor()) { - mqtt->publish(topic + "/meter/l2/powerfactor", String(data->getL2PowerFactor(), 2)); + mqtt.publish(topic + "/meter/l2/powerfactor", String(data->getL2PowerFactor(), 2)); } if(full || meterState->getL3PowerFactor() != data->getL3PowerFactor()) { - mqtt->publish(topic + "/meter/l3/powerfactor", String(data->getL3PowerFactor(), 2)); + mqtt.publish(topic + "/meter/l3/powerfactor", String(data->getL3PowerFactor(), 2)); } return true; } bool RawMqttHandler::publishRealtime(EnergyAccounting* ea) { - mqtt->publish(topic + "/realtime/import/hour", String(ea->getUseThisHour(), 3)); - mqtt->publish(topic + "/realtime/import/day", String(ea->getUseToday(), 2)); - mqtt->publish(topic + "/realtime/import/month", String(ea->getUseThisMonth(), 1)); + mqtt.publish(topic + "/realtime/import/hour", String(ea->getUseThisHour(), 3)); + mqtt.publish(topic + "/realtime/import/day", String(ea->getUseToday(), 2)); + mqtt.publish(topic + "/realtime/import/month", String(ea->getUseThisMonth(), 1)); uint8_t peakCount = ea->getConfig()->hours; if(peakCount > 5) peakCount = 5; for(uint8_t i = 1; i <= peakCount; i++) { - mqtt->publish(topic + "/realtime/import/peak/" + String(i, 10), String(ea->getPeak(i).value / 100.0, 10), true, 0); + mqtt.publish(topic + "/realtime/import/peak/" + String(i, 10), String(ea->getPeak(i).value / 100.0, 10), true, 0); } - mqtt->publish(topic + "/realtime/import/threshold", String(ea->getCurrentThreshold(), 10), true, 0); - mqtt->publish(topic + "/realtime/import/monthmax", String(ea->getMonthMax(), 3), true, 0); - mqtt->publish(topic + "/realtime/export/hour", String(ea->getProducedThisHour(), 3)); - mqtt->publish(topic + "/realtime/export/day", String(ea->getProducedToday(), 2)); - mqtt->publish(topic + "/realtime/export/month", String(ea->getProducedThisMonth(), 1)); + mqtt.publish(topic + "/realtime/import/threshold", String(ea->getCurrentThreshold(), 10), true, 0); + mqtt.publish(topic + "/realtime/import/monthmax", String(ea->getMonthMax(), 3), true, 0); + mqtt.publish(topic + "/realtime/export/hour", String(ea->getProducedThisHour(), 3)); + mqtt.publish(topic + "/realtime/export/day", String(ea->getProducedToday(), 2)); + mqtt.publish(topic + "/realtime/export/month", String(ea->getProducedThisMonth(), 1)); return true; } @@ -144,7 +144,7 @@ bool RawMqttHandler::publishTemperatures(AmsConfiguration* config, HwTools* hw) TempSensorData* data = hw->getTempSensorData(i); if(data != NULL && data->lastValidRead > -85) { if(data->changed || full) { - mqtt->publish(topic + "/temperature/" + toHex(data->address), String(data->lastValidRead, 2)); + mqtt.publish(topic + "/temperature/" + toHex(data->address), String(data->lastValidRead, 2)); data->changed = false; } } @@ -153,7 +153,7 @@ bool RawMqttHandler::publishTemperatures(AmsConfiguration* config, HwTools* hw) } bool RawMqttHandler::publishPrices(EntsoeApi* eapi) { - if(topic.isEmpty() || !mqtt->connected()) + if(topic.isEmpty() || !mqtt.connected()) return false; if(eapi->getValueForHour(0) == ENTSOE_NO_VALUE) return false; @@ -236,58 +236,54 @@ bool RawMqttHandler::publishPrices(EntsoeApi* eapi) { for(int i = 0; i < 34; i++) { float val = values[i]; if(val == ENTSOE_NO_VALUE) { - mqtt->publish(topic + "/price/" + String(i), "", true, 0); + mqtt.publish(topic + "/price/" + String(i), "", true, 0); } else { - mqtt->publish(topic + "/price/" + String(i), String(val, 4), true, 0); + mqtt.publish(topic + "/price/" + String(i), String(val, 4), true, 0); } - mqtt->loop(); + mqtt.loop(); delay(10); } if(min != INT16_MAX) { - mqtt->publish(topic + "/price/min", String(min, 4), true, 0); + mqtt.publish(topic + "/price/min", String(min, 4), true, 0); } if(max != INT16_MIN) { - mqtt->publish(topic + "/price/max", String(max, 4), true, 0); + mqtt.publish(topic + "/price/max", String(max, 4), true, 0); } if(min1hrIdx != -1) { - mqtt->publish(topic + "/price/cheapest/1hr", String(ts1hr), true, 0); + mqtt.publish(topic + "/price/cheapest/1hr", String(ts1hr), true, 0); } if(min3hrIdx != -1) { - mqtt->publish(topic + "/price/cheapest/3hr", String(ts3hr), true, 0); + mqtt.publish(topic + "/price/cheapest/3hr", String(ts3hr), true, 0); } if(min6hrIdx != -1) { - mqtt->publish(topic + "/price/cheapest/6hr", String(ts6hr), true, 0); + mqtt.publish(topic + "/price/cheapest/6hr", String(ts6hr), true, 0); } return true; } bool RawMqttHandler::publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea) { - if(topic.isEmpty() || !mqtt->connected()) + if(topic.isEmpty() || !mqtt.connected()) return false; - mqtt->publish(topic + "/id", WiFi.macAddress(), true, 0); - mqtt->publish(topic + "/uptime", String((uint32_t) (millis64()/1000))); + mqtt.publish(topic + "/id", WiFi.macAddress(), true, 0); + mqtt.publish(topic + "/uptime", String((uint32_t) (millis64()/1000))); float vcc = hw->getVcc(); if(vcc > 0) { - mqtt->publish(topic + "/vcc", String(vcc, 2)); + mqtt.publish(topic + "/vcc", String(vcc, 2)); } - mqtt->publish(topic + "/mem", String(ESP.getFreeHeap())); - mqtt->publish(topic + "/rssi", String(hw->getWifiRssi())); + mqtt.publish(topic + "/mem", String(ESP.getFreeHeap())); + mqtt.publish(topic + "/rssi", String(hw->getWifiRssi())); if(hw->getTemperature() > -85) { - mqtt->publish(topic + "/temperature", String(hw->getTemperature(), 2)); + mqtt.publish(topic + "/temperature", String(hw->getTemperature(), 2)); } return true; } -bool RawMqttHandler::loop() { - bool ret = mqtt->loop(); - delay(10); - yield(); - #if defined(ESP32) - esp_task_wdt_reset(); - #elif defined(ESP8266) - ESP.wdtFeed(); - #endif - return ret; +uint8_t RawMqttHandler::getFormat() { + return full ? 3 : 2; +} + +bool RawMqttHandler::publishRaw(String data) { + return false; } diff --git a/lib/SvelteUi/include/AmsWebServer.h b/lib/SvelteUi/include/AmsWebServer.h index db2666c3..a38e250f 100644 --- a/lib/SvelteUi/include/AmsWebServer.h +++ b/lib/SvelteUi/include/AmsWebServer.h @@ -2,7 +2,7 @@ #define _AMSWEBSERVER_h #include "Arduino.h" -#include +#include "AmsMqttHandler.h" #include "AmsConfiguration.h" #include "HwTools.h" #include "AmsData.h" @@ -39,6 +39,7 @@ public: void setMqttEnabled(bool); void setEntsoeApi(EntsoeApi* eapi); void setPriceSettings(String region, String currency); + void setMqttHandler(AmsMqttHandler* mqttHandler); private: RemoteDebug* debugger; @@ -54,7 +55,7 @@ private: AmsData* meterState; AmsDataStorage* ds; EnergyAccounting* ea = NULL; - MQTTClient* mqtt = NULL; + AmsMqttHandler* mqttHandler = NULL; bool uploading = false; File file; bool performRestart = false; diff --git a/lib/SvelteUi/src/AmsWebServer.cpp b/lib/SvelteUi/src/AmsWebServer.cpp index 4a9c477c..caf1aefb 100644 --- a/lib/SvelteUi/src/AmsWebServer.cpp +++ b/lib/SvelteUi/src/AmsWebServer.cpp @@ -35,7 +35,7 @@ #if defined(ESP32) #include #include -#include +#include #endif @@ -124,11 +124,6 @@ void AmsWebServer::setup(AmsConfiguration* config, GpioConfig* gpioConfig, Meter mqttEnabled = strlen(mqttConfig.host) > 0; } - -void AmsWebServer::setMqtt(MQTTClient* mqtt) { - this->mqtt = mqtt; -} - void AmsWebServer::setTimezone(Timezone* tz) { this->tz = tz; } @@ -136,6 +131,9 @@ void AmsWebServer::setTimezone(Timezone* tz) { void AmsWebServer::setMqttEnabled(bool enabled) { mqttEnabled = enabled; } +void AmsWebServer::setMqttHandler(AmsMqttHandler* mqttHandler) { + this->mqttHandler = mqttHandler; +} void AmsWebServer::setEntsoeApi(EntsoeApi* eapi) { this->eapi = eapi; @@ -418,9 +416,9 @@ void AmsWebServer::dataJson() { uint8_t mqttStatus; if(!mqttEnabled) { mqttStatus = 0; - } else if(mqtt != NULL && mqtt->connected()) { + } else if(mqttHandler != NULL && mqttHandler->connected()) { mqttStatus = 1; - } else if(mqtt != NULL && mqtt->lastError() == 0) { + } else if(mqttHandler != NULL && mqttHandler->lastError() == 0) { mqttStatus = 2; } else { mqttStatus = 3; @@ -467,7 +465,7 @@ void AmsWebServer::dataJson() { hanStatus, wifiStatus, mqttStatus, - mqtt == NULL ? 0 : (int) mqtt->lastError(), + mqttHandler == NULL ? 0 : (int) mqttHandler->lastError(), price == ENTSOE_NO_VALUE ? "null" : String(price, 2).c_str(), meterState->getMeterType(), meterConfig->distributionSystem, diff --git a/platformio.ini b/platformio.ini index 4310874b..ae1f0bb9 100755 --- a/platformio.ini +++ b/platformio.ini @@ -2,7 +2,7 @@ extra_configs = platformio-user.ini [common] -lib_deps = EEPROM, LittleFS, DNSServer, https://github.com/256dpi/arduino-mqtt.git, OneWireNg@0.10.0, DallasTemperature@3.9.1, EspSoftwareSerial@6.14.1, https://github.com/gskjold/RemoteDebug.git, Time@1.6.1, Timezone@1.2.4, FirmwareVersion, AmsConfiguration, AmsData, AmsDataStorage, HwTools, Uptime, AmsDecoder, EntsoePriceApi, EnergyAccounting, RawMqttHandler, JsonMqttHandler, DomoticzMqttHandler, HomeAssistantMqttHandler, SvelteUi +lib_deps = EEPROM, LittleFS, DNSServer, https://github.com/256dpi/arduino-mqtt.git, OneWireNg@0.10.0, DallasTemperature@3.9.1, EspSoftwareSerial@6.14.1, https://github.com/gskjold/RemoteDebug.git, Time@1.6.1, Timezone@1.2.4, FirmwareVersion, AmsConfiguration, AmsData, AmsDataStorage, HwTools, Uptime, AmsDecoder, EntsoePriceApi, EnergyAccounting, AmsMqttHandler, RawMqttHandler, JsonMqttHandler, DomoticzMqttHandler, HomeAssistantMqttHandler, SvelteUi lib_ignore = OneWire extra_scripts = pre:scripts/addversion.py diff --git a/src/AmsToMqttBridge.cpp b/src/AmsToMqttBridge.cpp index 134232ff..0fb41fa6 100644 --- a/src/AmsToMqttBridge.cpp +++ b/src/AmsToMqttBridge.cpp @@ -62,12 +62,13 @@ ADC_MODE(ADC_VCC); #include "RawMqttHandler.h" #include "DomoticzMqttHandler.h" #include "HomeAssistantMqttHandler.h" +#include "PassthroughMqttHandler.h" #include "Uptime.h" #include "RemoteDebug.h" -#define debugV_P(x, ...) if (Debug.isActive(Debug.VERBOSE)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();} +#define debugV_P(x, ...) if (Debug.isActive(Debug.VERBOSE)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();} #define debugD_P(x, ...) if (Debug.isActive(Debug.DEBUG)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();} #define debugI_P(x, ...) if (Debug.isActive(Debug.INFO)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();} #define debugW_P(x, ...) if (Debug.isActive(Debug.WARNING)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();} @@ -103,9 +104,6 @@ Timezone* tz = NULL; AmsWebServer ws(commonBuffer, &Debug, &hw); -MQTTClient *mqtt = NULL; -WiFiClient *mqttClient = NULL; -WiFiClientSecure *mqttSecureClient = NULL; AmsMqttHandler* mqttHandler = NULL; Stream *hanSerial; @@ -116,8 +114,6 @@ uint8_t rxBufferErrors = 0; SystemConfig sysConfig; GpioConfig gpioConfig; MeterConfig meterConfig; -bool mqttEnabled = false; -String topic = "ams"; AmsData meterState; bool ntpEnabled = false; @@ -546,14 +542,13 @@ void loop() { } #endif - if (mqttEnabled || config.isMqttChanged()) { - if(mqtt == NULL || !mqtt->connected() || config.isMqttChanged()) { + if (mqttHandler != NULL || config.isMqttChanged()) { + if(mqttHandler == NULL || !mqttHandler->connected() || config.isMqttChanged()) { MQTT_connect(); config.ackMqttChange(); } - } else if(mqtt != NULL && mqtt->connected()) { - mqttClient->stop(); - mqtt->disconnect(); + } else if(mqttHandler != NULL) { + mqttHandler->disconnect(); } try { @@ -568,9 +563,9 @@ void loop() { debugW_P(PSTR("Used %dms to handle web"), millis()-start); } } - if(mqtt != NULL) { + if(mqttHandler != NULL) { start = millis(); - mqtt->loop(); + mqttHandler->loop(); delay(10); // Needed to preserve power. After adding this, the voltage is super smooth on a HAN powered device end = millis(); if(end - start > 1000) { @@ -703,7 +698,7 @@ void handleSystem(unsigned long now) { unsigned long start, end; if(now - lastSysupdate > 60000) { start = millis(); - if(mqtt != NULL && mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED && mqtt->connected() && !topic.isEmpty()) { + if(mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED) { mqttHandler->publishSystem(&hw, eapi, &ea); } lastSysupdate = now; @@ -741,7 +736,7 @@ void handleTemperature(unsigned long now) { if(hw.updateTemperatures()) { lastTemperatureRead = now; - if(mqtt != NULL && mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED && mqtt->connected() && !topic.isEmpty()) { + if(mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED) { mqttHandler->publishTemperatures(&config, &hw); } } @@ -756,7 +751,7 @@ void handlePriceApi(unsigned long now) { unsigned long start, end; if(eapi != NULL && ntpEnabled) { start = millis(); - if(eapi->loop() && mqtt != NULL && mqttHandler != NULL && mqtt->connected()) { + if(eapi->loop() && mqttHandler != NULL) { end = millis(); if(end - start > 1000) { debugW_P(PSTR("Used %dms to update prices"), millis()-start); @@ -1078,7 +1073,7 @@ void errorBlink() { } break; case 1: - if(mqttEnabled && mqtt != NULL && mqtt->lastError() != 0) { + if(mqttHandler != NULL && mqttHandler->lastError() != 0) { debugW_P(PSTR("MQTT connection not available, double blink")); hw.ledBlink(LED_RED, 2); // If MQTT error, blink twice return; @@ -1221,10 +1216,8 @@ bool readHanPort() { meterState.setLastError(pos); printHanReadError(pos); len += hanSerial->readBytes(hanBuffer+len, BUF_SIZE_HAN-len); - if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) { - mqtt->publish(topic.c_str(), toHex(hanBuffer+pos, len)); - mqtt->loop(); - delay(10); + if(mqttHandler != NULL) { + mqttHandler->publishRaw(toHex(hanBuffer+pos, len)); } while(hanSerial->available()) hanSerial->read(); // Make sure it is all empty, in case we overflowed buffer above len = 0; @@ -1242,10 +1235,8 @@ bool readHanPort() { if(maxDetectedPayloadSize < pos) maxDetectedPayloadSize = pos; if(ctx.type == DATA_TAG_DLMS) { // If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT - if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) { - mqtt->publish(topic.c_str(), toHex((byte*) payload, ctx.length)); - mqtt->loop(); - delay(10); + if(mqttHandler != NULL) { + mqttHandler->publishRaw(toHex((byte*) payload, ctx.length)); } debugV_P(PSTR("Using application data:")); @@ -1300,7 +1291,7 @@ void handleDataSuccess(AmsData* data) { if(!hw.ledBlink(LED_GREEN, 1)) hw.ledBlink(LED_INTERNAL, 1); - if(mqttEnabled && mqttHandler != NULL && mqtt != NULL) { + if(mqttHandler != NULL) { #if defined(ESP32) esp_task_wdt_reset(); #elif defined(ESP8266) @@ -1308,7 +1299,6 @@ void handleDataSuccess(AmsData* data) { #endif yield(); if(mqttHandler->publish(data, &meterState, &ea, eapi)) { - mqtt->loop(); delay(10); } } @@ -1418,23 +1408,8 @@ unsigned long lastWifiRetry = -WIFI_CONNECTION_TIMEOUT; void WiFi_disconnect(unsigned long timeout) { if (Debug.isActive(RemoteDebug::INFO)) debugI_P(PSTR("Not connected to WiFi, closing resources")); - if(mqtt != NULL) { - mqtt->disconnect(); - mqtt->loop(); - delay(10); - yield(); - delete mqtt; - mqtt = NULL; - ws.setMqtt(NULL); - } - - if(mqttClient != NULL) { - mqttClient->stop(); - delete mqttClient; - mqttClient = NULL; - if(mqttSecureClient != NULL) { - mqttSecureClient = NULL; - } + if(mqttHandler != NULL) { + mqttHandler->disconnect(); } #if defined(ESP8266) @@ -1638,21 +1613,10 @@ void WiFi_post_connect() { MqttConfig mqttConfig; if(config.getMqttConfig(mqttConfig)) { - mqttEnabled = strlen(mqttConfig.host) > 0; - ws.setMqttEnabled(mqttEnabled); + ws.setMqttEnabled(strlen(mqttConfig.host) > 0); } } -void mqttMessageReceived(String &topic, String &payload) { - debugI_P(PSTR("Received message for topic %s"), topic.c_str() ); - //if(meterConfig.source == METER_SOURCE_MQTT) { - //DataParserContext ctx = {static_cast(payload.length()/2)}; - //fromHex(hanBuffer, payload, ctx.length); - //uint16_t pos = unwrapData(hanBuffer, ctx); - // TODO: Run through DLMS/DMSR parser and apply AmsData - //} -} - int16_t unwrapData(uint8_t *buf, DataParserContext &context) { int16_t ret = 0; bool doRet = false; @@ -1707,19 +1671,15 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) { case DATA_TAG_HDLC: debugV_P(PSTR("HDLC frame:")); // If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT - if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) { - mqtt->publish(topic.c_str(), toHex(buf, curLen)); - mqtt->loop(); - delay(10); + if(mqttHandler != NULL) { + mqttHandler->publishRaw(toHex(buf, curLen)); } break; case DATA_TAG_MBUS: debugV_P(PSTR("MBUS frame:")); // If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT - if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) { - mqtt->publish(topic.c_str(), toHex(buf, curLen)); - mqtt->loop(); - delay(10); + if(mqttHandler != NULL) { + mqttHandler->publishRaw(toHex(buf, curLen)); } break; case DATA_TAG_GBT: @@ -1736,10 +1696,8 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) { break; case DATA_TAG_DSMR: debugV_P(PSTR("DSMR frame:")); - if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) { - mqtt->publish(topic.c_str(), (char*) buf); - mqtt->loop(); - delay(10); + if(mqttHandler != NULL) { + mqttHandler->publishRaw(String((char*)buf)); } break; } @@ -1773,191 +1731,56 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) { unsigned long lastMqttRetry = -10000; void MQTT_connect() { + if(millis() - lastMqttRetry < (config.isMqttChanged() ? 5000 : 30000)) { + yield(); + return; + } + lastMqttRetry = millis(); + MqttConfig mqttConfig; if(!config.getMqttConfig(mqttConfig) || strlen(mqttConfig.host) == 0) { if(Debug.isActive(RemoteDebug::WARNING)) debugW_P(PSTR("No MQTT config")); - mqttEnabled = false; ws.setMqttEnabled(false); return; } - if(mqtt != NULL) { - if(millis() - lastMqttRetry < (mqtt->lastError() == 0 || config.isMqttChanged() ? 5000 : 30000)) { - yield(); - return; - } - lastMqttRetry = millis(); - if(Debug.isActive(RemoteDebug::INFO)) { - debugD_P(PSTR("Disconnecting MQTT before connecting")); - } - - mqtt->disconnect(); - if(config.isMqttChanged()) { - if(mqttSecureClient != NULL) { - mqttSecureClient->stop(); - delete mqttSecureClient; - mqttSecureClient = NULL; - } else { - mqttClient->stop(); - } - mqttClient = NULL; - } - yield(); - } else { - mqtt = new MQTTClient(128); - mqtt->dropOverflow(true); - ws.setMqtt(mqtt); - } - - mqttEnabled = true; ws.setMqttEnabled(true); - topic = String(mqttConfig.publishTopic); - if(mqttHandler != NULL) { + if(mqttHandler != NULL && mqttHandler->getFormat() != mqttConfig.payloadFormat) { delete mqttHandler; mqttHandler = NULL; } - switch(mqttConfig.payloadFormat) { - case 0: - mqttHandler = new JsonMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.clientId, mqttConfig.publishTopic, &hw); - break; - case 1: - case 2: - mqttHandler = new RawMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.publishTopic, mqttConfig.payloadFormat == 2); - break; - case 3: - DomoticzConfig domo; - config.getDomoticzConfig(domo); - mqttHandler = new DomoticzMqttHandler(mqtt, (char*) commonBuffer, domo); - break; - case 4: - HomeAssistantConfig haconf; - config.getHomeAssistantConfig(haconf); - mqttHandler = new HomeAssistantMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.clientId, mqttConfig.publishTopic, sysConfig.boardType, haconf, &hw); - break; - } - - time_t epoch = time(nullptr); - if(mqttConfig.ssl) { - if(epoch < FirmwareVersion::BuildEpoch) { - debugI_P(PSTR("NTP not ready for MQTT SSL")); - return; - } - debugI_P(PSTR("MQTT SSL is configured (%dkb free heap)"), ESP.getFreeHeap()); - if(mqttSecureClient == NULL) { - mqttSecureClient = new WiFiClientSecure(); - #if defined(ESP8266) - mqttSecureClient->setBufferSizes(512, 512); - debugD_P(PSTR("ESP8266 firmware does not have enough memory...")); - return; - #endif - - if(LittleFS.begin()) { - File file; - - if(LittleFS.exists(FILE_MQTT_CA)) { - debugI_P(PSTR("Found MQTT CA file (%dkb free heap)"), ESP.getFreeHeap()); - file = LittleFS.open(FILE_MQTT_CA, (char*) "r"); - #if defined(ESP8266) - BearSSL::X509List *serverTrustedCA = new BearSSL::X509List(file); - mqttSecureClient->setTrustAnchors(serverTrustedCA); - #elif defined(ESP32) - if(mqttSecureClient->loadCACert(file, file.size())) { - debugI_P(PSTR("CA accepted")); - } else { - debugW_P(PSTR("CA was rejected")); - delete mqttSecureClient; - mqttSecureClient = NULL; - return; - } - #endif - file.close(); - - if(LittleFS.exists(FILE_MQTT_CERT) && LittleFS.exists(FILE_MQTT_KEY)) { - #if defined(ESP8266) - debugI_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap()); - file = LittleFS.open(FILE_MQTT_CERT, (char*) "r"); - BearSSL::X509List *serverCertList = new BearSSL::X509List(file); - file.close(); - - debugI_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap()); - file = LittleFS.open(FILE_MQTT_KEY, (char*) "r"); - BearSSL::PrivateKey *serverPrivKey = new BearSSL::PrivateKey(file); - file.close(); - - debugD_P(PSTR("Setting client certificates (%dkb free heap)"), ESP.getFreeHeap()); - mqttSecureClient->setClientRSACert(serverCertList, serverPrivKey); - #elif defined(ESP32) - debugI_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap()); - file = LittleFS.open(FILE_MQTT_CERT, (char*) "r"); - mqttSecureClient->loadCertificate(file, file.size()); - file.close(); - - debugI_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap()); - file = LittleFS.open(FILE_MQTT_KEY, (char*) "r"); - mqttSecureClient->loadPrivateKey(file, file.size()); - file.close(); - #endif - } - } else { - debugI_P(PSTR("No CA, disabling certificate validation")); - mqttSecureClient->setInsecure(); - } - mqttClient = mqttSecureClient; - - LittleFS.end(); - debugD_P(PSTR("MQTT SSL setup complete (%dkb free heap)"), ESP.getFreeHeap()); - } + if(mqttHandler == NULL) { + switch(mqttConfig.payloadFormat) { + case 0: + mqttHandler = new JsonMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, &hw); + break; + case 1: + case 2: + mqttHandler = new RawMqttHandler(mqttConfig, &Debug, (char*) commonBuffer); + break; + case 3: + DomoticzConfig domo; + config.getDomoticzConfig(domo); + mqttHandler = new DomoticzMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, domo); + break; + case 4: + HomeAssistantConfig haconf; + config.getHomeAssistantConfig(haconf); + mqttHandler = new HomeAssistantMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, sysConfig.boardType, haconf, &hw); + break; + case 255: + mqttHandler = new PassthroughMqttHandler(mqttConfig, &Debug, (char*) commonBuffer); + break; } } - - if(mqttClient == NULL) { - debugI_P(PSTR("No SSL, using client without SSL support")); - mqttClient = new WiFiClient(); + ws.setMqttHandler(mqttHandler); + + if(mqttHandler != NULL) { + mqttHandler->connect(); + mqttHandler->publishSystem(&hw, eapi, &ea); } - - if(Debug.isActive(RemoteDebug::INFO)) { - debugI_P(PSTR("Connecting to MQTT %s:%d"), mqttConfig.host, mqttConfig.port); - } - - mqtt->begin(mqttConfig.host, mqttConfig.port, *mqttClient); - - #if defined(ESP8266) - if(mqttSecureClient) { - time_t epoch = time(nullptr); - debugD_P(PSTR("Setting NTP time %lu for secure MQTT connection"), epoch); - mqttSecureClient->setX509Time(epoch); - } - #endif - - // Connect to a unsecure or secure MQTT server - if ((strlen(mqttConfig.username) == 0 && mqtt->connect(mqttConfig.clientId)) || - (strlen(mqttConfig.username) > 0 && mqtt->connect(mqttConfig.clientId, mqttConfig.username, mqttConfig.password))) { - if (Debug.isActive(RemoteDebug::INFO)) debugI_P(PSTR("Successfully connected to MQTT!")); - - if(mqttHandler != NULL) { - mqttHandler->publishSystem(&hw, eapi, &ea); - } - - // Subscribe to the chosen MQTT topic, if set in configuration - if (strlen(mqttConfig.subscribeTopic) > 0) { - mqtt->onMessage(mqttMessageReceived); - mqtt->subscribe(String(mqttConfig.subscribeTopic) + "/#"); - debugI_P(PSTR(" Subscribing to [%s]\n"), mqttConfig.subscribeTopic); - } - } else { - if (Debug.isActive(RemoteDebug::ERROR)) { - debugE_P(PSTR("Failed to connect to MQTT: %d"), mqtt->lastError()); - #if defined(ESP8266) - if(mqttSecureClient) { - mqttSecureClient->getLastSSLError((char*) commonBuffer, BUF_SIZE_COMMON); - Debug.println((char*) commonBuffer); - } - #endif - } - } - yield(); } void configFileParse() { diff --git a/src/PassthroughMqttHandler.cpp b/src/PassthroughMqttHandler.cpp new file mode 100644 index 00000000..d78b786f --- /dev/null +++ b/src/PassthroughMqttHandler.cpp @@ -0,0 +1,28 @@ +#include "PassthroughMqttHandler.h" + +bool PassthroughMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi) { + return false; +} + +bool PassthroughMqttHandler::publishTemperatures(AmsConfiguration*, HwTools*) { + return false; +} + +bool PassthroughMqttHandler::publishPrices(EntsoeApi*) { + return false; +} + +bool PassthroughMqttHandler::publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea) { + return false; +} + +bool PassthroughMqttHandler::publishRaw(String data) { + bool ret = mqtt.publish(mqttConfig.publishTopic, data); + loop(); + delay(10); + return ret; +} + +uint8_t PassthroughMqttHandler::getFormat() { + return 255; +} \ No newline at end of file diff --git a/src/PassthroughMqttHandler.h b/src/PassthroughMqttHandler.h new file mode 100644 index 00000000..188548ed --- /dev/null +++ b/src/PassthroughMqttHandler.h @@ -0,0 +1,17 @@ +#ifndef _PASSTHROUGHMQTTHANDLER_H +#define _PASSTHROUGHMQTTHANDLER_H + +#include "AmsMqttHandler.h" + +class PassthroughMqttHandler : public AmsMqttHandler { +public: + PassthroughMqttHandler(MqttConfig& mqttConfig, RemoteDebug* debugger, char* buf) : AmsMqttHandler(mqttConfig, debugger, buf) {}; + bool publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi); + bool publishTemperatures(AmsConfiguration*, HwTools*); + bool publishPrices(EntsoeApi*); + bool publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea); + bool publishRaw(String data); + + uint8_t getFormat(); +}; +#endif