Moved MQTT connection into handler

This commit is contained in:
Gunnar Skjold
2023-10-14 08:07:56 +02:00
parent f9b4680b9c
commit 7b025ddb01
17 changed files with 461 additions and 428 deletions

View File

@@ -62,12 +62,13 @@ ADC_MODE(ADC_VCC);
#include "RawMqttHandler.h"
#include "DomoticzMqttHandler.h"
#include "HomeAssistantMqttHandler.h"
#include "PassthroughMqttHandler.h"
#include "Uptime.h"
#include "RemoteDebug.h"
#define debugV_P(x, ...) if (Debug.isActive(Debug.VERBOSE)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
#define debugV_P(x, ...) if (Debug.isActive(Debug.VERBOSE)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
#define debugD_P(x, ...) if (Debug.isActive(Debug.DEBUG)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
#define debugI_P(x, ...) if (Debug.isActive(Debug.INFO)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
#define debugW_P(x, ...) if (Debug.isActive(Debug.WARNING)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
@@ -103,9 +104,6 @@ Timezone* tz = NULL;
AmsWebServer ws(commonBuffer, &Debug, &hw);
MQTTClient *mqtt = NULL;
WiFiClient *mqttClient = NULL;
WiFiClientSecure *mqttSecureClient = NULL;
AmsMqttHandler* mqttHandler = NULL;
Stream *hanSerial;
@@ -116,8 +114,6 @@ uint8_t rxBufferErrors = 0;
SystemConfig sysConfig;
GpioConfig gpioConfig;
MeterConfig meterConfig;
bool mqttEnabled = false;
String topic = "ams";
AmsData meterState;
bool ntpEnabled = false;
@@ -546,14 +542,13 @@ void loop() {
}
#endif
if (mqttEnabled || config.isMqttChanged()) {
if(mqtt == NULL || !mqtt->connected() || config.isMqttChanged()) {
if (mqttHandler != NULL || config.isMqttChanged()) {
if(mqttHandler == NULL || !mqttHandler->connected() || config.isMqttChanged()) {
MQTT_connect();
config.ackMqttChange();
}
} else if(mqtt != NULL && mqtt->connected()) {
mqttClient->stop();
mqtt->disconnect();
} else if(mqttHandler != NULL) {
mqttHandler->disconnect();
}
try {
@@ -568,9 +563,9 @@ void loop() {
debugW_P(PSTR("Used %dms to handle web"), millis()-start);
}
}
if(mqtt != NULL) {
if(mqttHandler != NULL) {
start = millis();
mqtt->loop();
mqttHandler->loop();
delay(10); // Needed to preserve power. After adding this, the voltage is super smooth on a HAN powered device
end = millis();
if(end - start > 1000) {
@@ -703,7 +698,7 @@ void handleSystem(unsigned long now) {
unsigned long start, end;
if(now - lastSysupdate > 60000) {
start = millis();
if(mqtt != NULL && mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED && mqtt->connected() && !topic.isEmpty()) {
if(mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED) {
mqttHandler->publishSystem(&hw, eapi, &ea);
}
lastSysupdate = now;
@@ -741,7 +736,7 @@ void handleTemperature(unsigned long now) {
if(hw.updateTemperatures()) {
lastTemperatureRead = now;
if(mqtt != NULL && mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED && mqtt->connected() && !topic.isEmpty()) {
if(mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED) {
mqttHandler->publishTemperatures(&config, &hw);
}
}
@@ -756,7 +751,7 @@ void handlePriceApi(unsigned long now) {
unsigned long start, end;
if(eapi != NULL && ntpEnabled) {
start = millis();
if(eapi->loop() && mqtt != NULL && mqttHandler != NULL && mqtt->connected()) {
if(eapi->loop() && mqttHandler != NULL) {
end = millis();
if(end - start > 1000) {
debugW_P(PSTR("Used %dms to update prices"), millis()-start);
@@ -1078,7 +1073,7 @@ void errorBlink() {
}
break;
case 1:
if(mqttEnabled && mqtt != NULL && mqtt->lastError() != 0) {
if(mqttHandler != NULL && mqttHandler->lastError() != 0) {
debugW_P(PSTR("MQTT connection not available, double blink"));
hw.ledBlink(LED_RED, 2); // If MQTT error, blink twice
return;
@@ -1221,10 +1216,8 @@ bool readHanPort() {
meterState.setLastError(pos);
printHanReadError(pos);
len += hanSerial->readBytes(hanBuffer+len, BUF_SIZE_HAN-len);
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(hanBuffer+pos, len));
mqtt->loop();
delay(10);
if(mqttHandler != NULL) {
mqttHandler->publishRaw(toHex(hanBuffer+pos, len));
}
while(hanSerial->available()) hanSerial->read(); // Make sure it is all empty, in case we overflowed buffer above
len = 0;
@@ -1242,10 +1235,8 @@ bool readHanPort() {
if(maxDetectedPayloadSize < pos) maxDetectedPayloadSize = pos;
if(ctx.type == DATA_TAG_DLMS) {
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex((byte*) payload, ctx.length));
mqtt->loop();
delay(10);
if(mqttHandler != NULL) {
mqttHandler->publishRaw(toHex((byte*) payload, ctx.length));
}
debugV_P(PSTR("Using application data:"));
@@ -1300,7 +1291,7 @@ void handleDataSuccess(AmsData* data) {
if(!hw.ledBlink(LED_GREEN, 1))
hw.ledBlink(LED_INTERNAL, 1);
if(mqttEnabled && mqttHandler != NULL && mqtt != NULL) {
if(mqttHandler != NULL) {
#if defined(ESP32)
esp_task_wdt_reset();
#elif defined(ESP8266)
@@ -1308,7 +1299,6 @@ void handleDataSuccess(AmsData* data) {
#endif
yield();
if(mqttHandler->publish(data, &meterState, &ea, eapi)) {
mqtt->loop();
delay(10);
}
}
@@ -1418,23 +1408,8 @@ unsigned long lastWifiRetry = -WIFI_CONNECTION_TIMEOUT;
void WiFi_disconnect(unsigned long timeout) {
if (Debug.isActive(RemoteDebug::INFO)) debugI_P(PSTR("Not connected to WiFi, closing resources"));
if(mqtt != NULL) {
mqtt->disconnect();
mqtt->loop();
delay(10);
yield();
delete mqtt;
mqtt = NULL;
ws.setMqtt(NULL);
}
if(mqttClient != NULL) {
mqttClient->stop();
delete mqttClient;
mqttClient = NULL;
if(mqttSecureClient != NULL) {
mqttSecureClient = NULL;
}
if(mqttHandler != NULL) {
mqttHandler->disconnect();
}
#if defined(ESP8266)
@@ -1638,21 +1613,10 @@ void WiFi_post_connect() {
MqttConfig mqttConfig;
if(config.getMqttConfig(mqttConfig)) {
mqttEnabled = strlen(mqttConfig.host) > 0;
ws.setMqttEnabled(mqttEnabled);
ws.setMqttEnabled(strlen(mqttConfig.host) > 0);
}
}
void mqttMessageReceived(String &topic, String &payload) {
debugI_P(PSTR("Received message for topic %s"), topic.c_str() );
//if(meterConfig.source == METER_SOURCE_MQTT) {
//DataParserContext ctx = {static_cast<uint8_t>(payload.length()/2)};
//fromHex(hanBuffer, payload, ctx.length);
//uint16_t pos = unwrapData(hanBuffer, ctx);
// TODO: Run through DLMS/DMSR parser and apply AmsData
//}
}
int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
int16_t ret = 0;
bool doRet = false;
@@ -1707,19 +1671,15 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
case DATA_TAG_HDLC:
debugV_P(PSTR("HDLC frame:"));
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(buf, curLen));
mqtt->loop();
delay(10);
if(mqttHandler != NULL) {
mqttHandler->publishRaw(toHex(buf, curLen));
}
break;
case DATA_TAG_MBUS:
debugV_P(PSTR("MBUS frame:"));
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(buf, curLen));
mqtt->loop();
delay(10);
if(mqttHandler != NULL) {
mqttHandler->publishRaw(toHex(buf, curLen));
}
break;
case DATA_TAG_GBT:
@@ -1736,10 +1696,8 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
break;
case DATA_TAG_DSMR:
debugV_P(PSTR("DSMR frame:"));
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), (char*) buf);
mqtt->loop();
delay(10);
if(mqttHandler != NULL) {
mqttHandler->publishRaw(String((char*)buf));
}
break;
}
@@ -1773,191 +1731,56 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
unsigned long lastMqttRetry = -10000;
void MQTT_connect() {
if(millis() - lastMqttRetry < (config.isMqttChanged() ? 5000 : 30000)) {
yield();
return;
}
lastMqttRetry = millis();
MqttConfig mqttConfig;
if(!config.getMqttConfig(mqttConfig) || strlen(mqttConfig.host) == 0) {
if(Debug.isActive(RemoteDebug::WARNING)) debugW_P(PSTR("No MQTT config"));
mqttEnabled = false;
ws.setMqttEnabled(false);
return;
}
if(mqtt != NULL) {
if(millis() - lastMqttRetry < (mqtt->lastError() == 0 || config.isMqttChanged() ? 5000 : 30000)) {
yield();
return;
}
lastMqttRetry = millis();
if(Debug.isActive(RemoteDebug::INFO)) {
debugD_P(PSTR("Disconnecting MQTT before connecting"));
}
mqtt->disconnect();
if(config.isMqttChanged()) {
if(mqttSecureClient != NULL) {
mqttSecureClient->stop();
delete mqttSecureClient;
mqttSecureClient = NULL;
} else {
mqttClient->stop();
}
mqttClient = NULL;
}
yield();
} else {
mqtt = new MQTTClient(128);
mqtt->dropOverflow(true);
ws.setMqtt(mqtt);
}
mqttEnabled = true;
ws.setMqttEnabled(true);
topic = String(mqttConfig.publishTopic);
if(mqttHandler != NULL) {
if(mqttHandler != NULL && mqttHandler->getFormat() != mqttConfig.payloadFormat) {
delete mqttHandler;
mqttHandler = NULL;
}
switch(mqttConfig.payloadFormat) {
case 0:
mqttHandler = new JsonMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.clientId, mqttConfig.publishTopic, &hw);
break;
case 1:
case 2:
mqttHandler = new RawMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.publishTopic, mqttConfig.payloadFormat == 2);
break;
case 3:
DomoticzConfig domo;
config.getDomoticzConfig(domo);
mqttHandler = new DomoticzMqttHandler(mqtt, (char*) commonBuffer, domo);
break;
case 4:
HomeAssistantConfig haconf;
config.getHomeAssistantConfig(haconf);
mqttHandler = new HomeAssistantMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.clientId, mqttConfig.publishTopic, sysConfig.boardType, haconf, &hw);
break;
}
time_t epoch = time(nullptr);
if(mqttConfig.ssl) {
if(epoch < FirmwareVersion::BuildEpoch) {
debugI_P(PSTR("NTP not ready for MQTT SSL"));
return;
}
debugI_P(PSTR("MQTT SSL is configured (%dkb free heap)"), ESP.getFreeHeap());
if(mqttSecureClient == NULL) {
mqttSecureClient = new WiFiClientSecure();
#if defined(ESP8266)
mqttSecureClient->setBufferSizes(512, 512);
debugD_P(PSTR("ESP8266 firmware does not have enough memory..."));
return;
#endif
if(LittleFS.begin()) {
File file;
if(LittleFS.exists(FILE_MQTT_CA)) {
debugI_P(PSTR("Found MQTT CA file (%dkb free heap)"), ESP.getFreeHeap());
file = LittleFS.open(FILE_MQTT_CA, (char*) "r");
#if defined(ESP8266)
BearSSL::X509List *serverTrustedCA = new BearSSL::X509List(file);
mqttSecureClient->setTrustAnchors(serverTrustedCA);
#elif defined(ESP32)
if(mqttSecureClient->loadCACert(file, file.size())) {
debugI_P(PSTR("CA accepted"));
} else {
debugW_P(PSTR("CA was rejected"));
delete mqttSecureClient;
mqttSecureClient = NULL;
return;
}
#endif
file.close();
if(LittleFS.exists(FILE_MQTT_CERT) && LittleFS.exists(FILE_MQTT_KEY)) {
#if defined(ESP8266)
debugI_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap());
file = LittleFS.open(FILE_MQTT_CERT, (char*) "r");
BearSSL::X509List *serverCertList = new BearSSL::X509List(file);
file.close();
debugI_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap());
file = LittleFS.open(FILE_MQTT_KEY, (char*) "r");
BearSSL::PrivateKey *serverPrivKey = new BearSSL::PrivateKey(file);
file.close();
debugD_P(PSTR("Setting client certificates (%dkb free heap)"), ESP.getFreeHeap());
mqttSecureClient->setClientRSACert(serverCertList, serverPrivKey);
#elif defined(ESP32)
debugI_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap());
file = LittleFS.open(FILE_MQTT_CERT, (char*) "r");
mqttSecureClient->loadCertificate(file, file.size());
file.close();
debugI_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap());
file = LittleFS.open(FILE_MQTT_KEY, (char*) "r");
mqttSecureClient->loadPrivateKey(file, file.size());
file.close();
#endif
}
} else {
debugI_P(PSTR("No CA, disabling certificate validation"));
mqttSecureClient->setInsecure();
}
mqttClient = mqttSecureClient;
LittleFS.end();
debugD_P(PSTR("MQTT SSL setup complete (%dkb free heap)"), ESP.getFreeHeap());
}
if(mqttHandler == NULL) {
switch(mqttConfig.payloadFormat) {
case 0:
mqttHandler = new JsonMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, &hw);
break;
case 1:
case 2:
mqttHandler = new RawMqttHandler(mqttConfig, &Debug, (char*) commonBuffer);
break;
case 3:
DomoticzConfig domo;
config.getDomoticzConfig(domo);
mqttHandler = new DomoticzMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, domo);
break;
case 4:
HomeAssistantConfig haconf;
config.getHomeAssistantConfig(haconf);
mqttHandler = new HomeAssistantMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, sysConfig.boardType, haconf, &hw);
break;
case 255:
mqttHandler = new PassthroughMqttHandler(mqttConfig, &Debug, (char*) commonBuffer);
break;
}
}
if(mqttClient == NULL) {
debugI_P(PSTR("No SSL, using client without SSL support"));
mqttClient = new WiFiClient();
ws.setMqttHandler(mqttHandler);
if(mqttHandler != NULL) {
mqttHandler->connect();
mqttHandler->publishSystem(&hw, eapi, &ea);
}
if(Debug.isActive(RemoteDebug::INFO)) {
debugI_P(PSTR("Connecting to MQTT %s:%d"), mqttConfig.host, mqttConfig.port);
}
mqtt->begin(mqttConfig.host, mqttConfig.port, *mqttClient);
#if defined(ESP8266)
if(mqttSecureClient) {
time_t epoch = time(nullptr);
debugD_P(PSTR("Setting NTP time %lu for secure MQTT connection"), epoch);
mqttSecureClient->setX509Time(epoch);
}
#endif
// Connect to a unsecure or secure MQTT server
if ((strlen(mqttConfig.username) == 0 && mqtt->connect(mqttConfig.clientId)) ||
(strlen(mqttConfig.username) > 0 && mqtt->connect(mqttConfig.clientId, mqttConfig.username, mqttConfig.password))) {
if (Debug.isActive(RemoteDebug::INFO)) debugI_P(PSTR("Successfully connected to MQTT!"));
if(mqttHandler != NULL) {
mqttHandler->publishSystem(&hw, eapi, &ea);
}
// Subscribe to the chosen MQTT topic, if set in configuration
if (strlen(mqttConfig.subscribeTopic) > 0) {
mqtt->onMessage(mqttMessageReceived);
mqtt->subscribe(String(mqttConfig.subscribeTopic) + "/#");
debugI_P(PSTR(" Subscribing to [%s]\n"), mqttConfig.subscribeTopic);
}
} else {
if (Debug.isActive(RemoteDebug::ERROR)) {
debugE_P(PSTR("Failed to connect to MQTT: %d"), mqtt->lastError());
#if defined(ESP8266)
if(mqttSecureClient) {
mqttSecureClient->getLastSSLError((char*) commonBuffer, BUF_SIZE_COMMON);
Debug.println((char*) commonBuffer);
}
#endif
}
}
yield();
}
void configFileParse() {