mirror of
https://github.com/UtilitechAS/amsreader-firmware.git
synced 2026-01-28 21:10:59 +00:00
Merge branch 'mqtt_changes'
This commit is contained in:
@@ -48,12 +48,13 @@ ADC_MODE(ADC_VCC);
|
||||
#include "RawMqttHandler.h"
|
||||
#include "DomoticzMqttHandler.h"
|
||||
#include "HomeAssistantMqttHandler.h"
|
||||
#include "PassthroughMqttHandler.h"
|
||||
|
||||
#include "Uptime.h"
|
||||
|
||||
#include "RemoteDebug.h"
|
||||
|
||||
#define debugV_P(x, ...) if (Debug.isActive(Debug.VERBOSE)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
|
||||
#define debugV_P(x, ...) if (Debug.isActive(Debug.VERBOSE)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
|
||||
#define debugD_P(x, ...) if (Debug.isActive(Debug.DEBUG)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
|
||||
#define debugI_P(x, ...) if (Debug.isActive(Debug.INFO)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
|
||||
#define debugW_P(x, ...) if (Debug.isActive(Debug.WARNING)) {Debug.printf_P(x, ##__VA_ARGS__);Debug.println();}
|
||||
@@ -89,11 +90,32 @@ Timezone* tz = NULL;
|
||||
|
||||
AmsWebServer ws(commonBuffer, &Debug, &hw);
|
||||
|
||||
MQTTClient *mqtt = NULL;
|
||||
WiFiClient *mqttClient = NULL;
|
||||
WiFiClientSecure *mqttSecureClient = NULL;
|
||||
bool mqttEnabled = false;
|
||||
AmsMqttHandler* mqttHandler = NULL;
|
||||
|
||||
#if defined(ESP32)
|
||||
JsonMqttHandler* energySpeedometer = NULL;
|
||||
MqttConfig energySpeedometerConfig = {
|
||||
"mqtt.sandtime.energy",
|
||||
8883,
|
||||
"",
|
||||
"amsleser",
|
||||
"",
|
||||
#if defined(ENERGY_SPEEDOMETER_USER)
|
||||
ENERGY_SPEEDOMETER_USER,
|
||||
#else
|
||||
"",
|
||||
#endif
|
||||
#if defined(ENERGY_SPEEDOMETER_PASS)
|
||||
ENERGY_SPEEDOMETER_PASS,
|
||||
#else
|
||||
"",
|
||||
#endif
|
||||
0,
|
||||
true
|
||||
};
|
||||
#endif
|
||||
|
||||
Stream *hanSerial;
|
||||
SoftwareSerial *swSerial = NULL;
|
||||
HardwareSerial *hwSerial = NULL;
|
||||
@@ -102,8 +124,6 @@ uint8_t rxBufferErrors = 0;
|
||||
SystemConfig sysConfig;
|
||||
GpioConfig gpioConfig;
|
||||
MeterConfig meterConfig;
|
||||
bool mqttEnabled = false;
|
||||
String topic = "ams";
|
||||
AmsData meterState;
|
||||
bool ntpEnabled = false;
|
||||
|
||||
@@ -533,15 +553,49 @@ void loop() {
|
||||
#endif
|
||||
|
||||
if (mqttEnabled || config.isMqttChanged()) {
|
||||
if(mqtt == NULL || !mqtt->connected() || config.isMqttChanged()) {
|
||||
if(mqttHandler == NULL || !mqttHandler->connected() || config.isMqttChanged()) {
|
||||
MQTT_connect();
|
||||
config.ackMqttChange();
|
||||
}
|
||||
} else if(mqtt != NULL && mqtt->connected()) {
|
||||
mqttClient->stop();
|
||||
mqtt->disconnect();
|
||||
} else if(mqttHandler != NULL) {
|
||||
mqttHandler->disconnect();
|
||||
}
|
||||
|
||||
#if defined(ENERGY_SPEEDOMETER_PASS)
|
||||
if(sysConfig.energyspeedometer == 7) {
|
||||
if(!meterState.getMeterId().isEmpty()) {
|
||||
if(energySpeedometer == NULL) {
|
||||
uint16_t chipId;
|
||||
#if defined(ESP32)
|
||||
chipId = ( ESP.getEfuseMac() >> 32 ) % 0xFFFFFFFF;
|
||||
#else
|
||||
chipId = ESP.getChipId();
|
||||
#endif
|
||||
strcpy(energySpeedometerConfig.clientId, (String("ams") + String(chipId, HEX)).c_str());
|
||||
energySpeedometer = new JsonMqttHandler(energySpeedometerConfig, &Debug, (char*) commonBuffer, &hw);
|
||||
energySpeedometer->setCaVerification(false);
|
||||
}
|
||||
if(!energySpeedometer->connected()) {
|
||||
lwmqtt_err_t err = energySpeedometer->lastError();
|
||||
if(err > 0)
|
||||
debugE_P(PSTR("Energyspeedometer connector reporting error (%d)"), err);
|
||||
energySpeedometer->connect();
|
||||
energySpeedometer->publishSystem(&hw, eapi, &ea);
|
||||
}
|
||||
energySpeedometer->loop();
|
||||
delay(10);
|
||||
}
|
||||
} else if(energySpeedometer != NULL) {
|
||||
if(energySpeedometer->connected()) {
|
||||
energySpeedometer->disconnect();
|
||||
energySpeedometer->loop();
|
||||
} else {
|
||||
delete energySpeedometer;
|
||||
energySpeedometer = NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
try {
|
||||
handlePriceApi(now);
|
||||
} catch(const std::exception& e) {
|
||||
@@ -554,9 +608,9 @@ void loop() {
|
||||
debugW_P(PSTR("Used %dms to handle web"), millis()-start);
|
||||
}
|
||||
}
|
||||
if(mqtt != NULL) {
|
||||
if(mqttHandler != NULL) {
|
||||
start = millis();
|
||||
mqtt->loop();
|
||||
mqttHandler->loop();
|
||||
delay(10); // Needed to preserve power. After adding this, the voltage is super smooth on a HAN powered device
|
||||
end = millis();
|
||||
if(end - start > 1000) {
|
||||
@@ -687,11 +741,23 @@ void handleNtpChange() {
|
||||
}
|
||||
|
||||
void handleSystem(unsigned long now) {
|
||||
if(config.isSystemConfigChanged()) {
|
||||
config.getSystemConfig(sysConfig);
|
||||
config.ackSystemConfigChanged();
|
||||
}
|
||||
|
||||
unsigned long start, end;
|
||||
if(now - lastSysupdate > 60000) {
|
||||
start = millis();
|
||||
if(mqtt != NULL && mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED && mqtt->connected() && !topic.isEmpty()) {
|
||||
mqttHandler->publishSystem(&hw, eapi, &ea);
|
||||
if(WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED) {
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->publishSystem(&hw, eapi, &ea);
|
||||
}
|
||||
#if defined(ENERGY_SPEEDOMETER_PASS)
|
||||
if(energySpeedometer != NULL) {
|
||||
energySpeedometer->publishSystem(&hw, eapi, &ea);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
lastSysupdate = now;
|
||||
end = millis();
|
||||
@@ -738,7 +804,7 @@ void handleTemperature(unsigned long now) {
|
||||
if(hw.updateTemperatures()) {
|
||||
lastTemperatureRead = now;
|
||||
|
||||
if(mqtt != NULL && mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED && mqtt->connected() && !topic.isEmpty()) {
|
||||
if(mqttHandler != NULL && WiFi.getMode() != WIFI_AP && WiFi.status() == WL_CONNECTED) {
|
||||
mqttHandler->publishTemperatures(&config, &hw);
|
||||
}
|
||||
}
|
||||
@@ -753,7 +819,7 @@ void handlePriceApi(unsigned long now) {
|
||||
unsigned long start, end;
|
||||
if(eapi != NULL && ntpEnabled) {
|
||||
start = millis();
|
||||
if(eapi->loop() && mqtt != NULL && mqttHandler != NULL && mqtt->connected()) {
|
||||
if(eapi->loop() && mqttHandler != NULL) {
|
||||
end = millis();
|
||||
if(end - start > 1000) {
|
||||
debugW_P(PSTR("Used %dms to update prices"), millis()-start);
|
||||
@@ -1078,7 +1144,7 @@ void errorBlink() {
|
||||
}
|
||||
break;
|
||||
case 1:
|
||||
if(mqttEnabled && mqtt != NULL && mqtt->lastError() != 0) {
|
||||
if(mqttHandler != NULL && mqttHandler->lastError() != 0) {
|
||||
debugW_P(PSTR("MQTT connection not available, double blink"));
|
||||
hw.ledBlink(LED_RED, 2); // If MQTT error, blink twice
|
||||
return;
|
||||
@@ -1221,10 +1287,8 @@ bool readHanPort() {
|
||||
meterState.setLastError(pos);
|
||||
printHanReadError(pos);
|
||||
len += hanSerial->readBytes(hanBuffer+len, BUF_SIZE_HAN-len);
|
||||
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
|
||||
mqtt->publish(topic.c_str(), toHex(hanBuffer+pos, len));
|
||||
mqtt->loop();
|
||||
delay(10);
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->publishRaw(toHex(hanBuffer+pos, len));
|
||||
}
|
||||
while(hanSerial->available()) hanSerial->read(); // Make sure it is all empty, in case we overflowed buffer above
|
||||
len = 0;
|
||||
@@ -1242,10 +1306,8 @@ bool readHanPort() {
|
||||
if(maxDetectedPayloadSize < pos) maxDetectedPayloadSize = pos;
|
||||
if(ctx.type == DATA_TAG_DLMS) {
|
||||
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
|
||||
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
|
||||
mqtt->publish(topic.c_str(), toHex((byte*) payload, ctx.length));
|
||||
mqtt->loop();
|
||||
delay(10);
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->publishRaw(toHex((byte*) payload, ctx.length));
|
||||
}
|
||||
|
||||
debugV_P(PSTR("Using application data:"));
|
||||
@@ -1300,7 +1362,7 @@ void handleDataSuccess(AmsData* data) {
|
||||
if(!hw.ledBlink(LED_GREEN, 1))
|
||||
hw.ledBlink(LED_INTERNAL, 1);
|
||||
|
||||
if(mqttEnabled && mqttHandler != NULL && mqtt != NULL) {
|
||||
if(mqttHandler != NULL) {
|
||||
#if defined(ESP32)
|
||||
esp_task_wdt_reset();
|
||||
#elif defined(ESP8266)
|
||||
@@ -1308,10 +1370,14 @@ void handleDataSuccess(AmsData* data) {
|
||||
#endif
|
||||
yield();
|
||||
if(mqttHandler->publish(data, &meterState, &ea, eapi)) {
|
||||
mqtt->loop();
|
||||
delay(10);
|
||||
}
|
||||
}
|
||||
#if defined(ENERGY_SPEEDOMETER_PASS)
|
||||
if(energySpeedometer != NULL && energySpeedometer->publish(&meterState, &meterState, &ea, eapi)) {
|
||||
delay(10);
|
||||
}
|
||||
#endif
|
||||
|
||||
time_t now = time(nullptr);
|
||||
if(now < FirmwareVersion::BuildEpoch && data->getListType() >= 3) {
|
||||
@@ -1418,23 +1484,8 @@ unsigned long lastWifiRetry = -WIFI_CONNECTION_TIMEOUT;
|
||||
|
||||
void WiFi_disconnect(unsigned long timeout) {
|
||||
if (Debug.isActive(RemoteDebug::INFO)) debugI_P(PSTR("Not connected to WiFi, closing resources"));
|
||||
if(mqtt != NULL) {
|
||||
mqtt->disconnect();
|
||||
mqtt->loop();
|
||||
delay(10);
|
||||
yield();
|
||||
delete mqtt;
|
||||
mqtt = NULL;
|
||||
ws.setMqtt(NULL);
|
||||
}
|
||||
|
||||
if(mqttClient != NULL) {
|
||||
mqttClient->stop();
|
||||
delete mqttClient;
|
||||
mqttClient = NULL;
|
||||
if(mqttSecureClient != NULL) {
|
||||
mqttSecureClient = NULL;
|
||||
}
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->disconnect();
|
||||
}
|
||||
|
||||
#if defined(ESP8266)
|
||||
@@ -1647,16 +1698,6 @@ void WiFi_post_connect() {
|
||||
}
|
||||
}
|
||||
|
||||
void mqttMessageReceived(String &topic, String &payload) {
|
||||
debugI_P(PSTR("Received message for topic %s"), topic.c_str() );
|
||||
//if(meterConfig.source == METER_SOURCE_MQTT) {
|
||||
//DataParserContext ctx = {static_cast<uint8_t>(payload.length()/2)};
|
||||
//fromHex(hanBuffer, payload, ctx.length);
|
||||
//uint16_t pos = unwrapData(hanBuffer, ctx);
|
||||
// TODO: Run through DLMS/DMSR parser and apply AmsData
|
||||
//}
|
||||
}
|
||||
|
||||
int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
|
||||
int16_t ret = 0;
|
||||
bool doRet = false;
|
||||
@@ -1711,19 +1752,15 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
|
||||
case DATA_TAG_HDLC:
|
||||
debugV_P(PSTR("HDLC frame:"));
|
||||
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
|
||||
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
|
||||
mqtt->publish(topic.c_str(), toHex(buf, curLen));
|
||||
mqtt->loop();
|
||||
delay(10);
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->publishRaw(toHex(buf, curLen));
|
||||
}
|
||||
break;
|
||||
case DATA_TAG_MBUS:
|
||||
debugV_P(PSTR("MBUS frame:"));
|
||||
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
|
||||
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
|
||||
mqtt->publish(topic.c_str(), toHex(buf, curLen));
|
||||
mqtt->loop();
|
||||
delay(10);
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->publishRaw(toHex(buf, curLen));
|
||||
}
|
||||
break;
|
||||
case DATA_TAG_GBT:
|
||||
@@ -1740,10 +1777,8 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
|
||||
break;
|
||||
case DATA_TAG_DSMR:
|
||||
debugV_P(PSTR("DSMR frame:"));
|
||||
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
|
||||
mqtt->publish(topic.c_str(), (char*) buf);
|
||||
mqtt->loop();
|
||||
delay(10);
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->publishRaw(String((char*)buf));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -1777,191 +1812,57 @@ int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
|
||||
|
||||
unsigned long lastMqttRetry = -10000;
|
||||
void MQTT_connect() {
|
||||
if(millis() - lastMqttRetry < (config.isMqttChanged() ? 5000 : 30000)) {
|
||||
yield();
|
||||
return;
|
||||
}
|
||||
lastMqttRetry = millis();
|
||||
|
||||
MqttConfig mqttConfig;
|
||||
if(!config.getMqttConfig(mqttConfig) || strlen(mqttConfig.host) == 0) {
|
||||
if(Debug.isActive(RemoteDebug::WARNING)) debugW_P(PSTR("No MQTT config"));
|
||||
mqttEnabled = false;
|
||||
ws.setMqttEnabled(false);
|
||||
mqttEnabled = false;
|
||||
return;
|
||||
}
|
||||
if(mqtt != NULL) {
|
||||
if(millis() - lastMqttRetry < (mqtt->lastError() == 0 || config.isMqttChanged() ? 5000 : 30000)) {
|
||||
yield();
|
||||
return;
|
||||
}
|
||||
lastMqttRetry = millis();
|
||||
|
||||
if(Debug.isActive(RemoteDebug::INFO)) {
|
||||
debugD_P(PSTR("Disconnecting MQTT before connecting"));
|
||||
}
|
||||
|
||||
mqtt->disconnect();
|
||||
if(config.isMqttChanged()) {
|
||||
if(mqttSecureClient != NULL) {
|
||||
mqttSecureClient->stop();
|
||||
delete mqttSecureClient;
|
||||
mqttSecureClient = NULL;
|
||||
} else {
|
||||
mqttClient->stop();
|
||||
}
|
||||
mqttClient = NULL;
|
||||
}
|
||||
yield();
|
||||
} else {
|
||||
mqtt = new MQTTClient(256);
|
||||
mqtt->dropOverflow(true);
|
||||
ws.setMqtt(mqtt);
|
||||
}
|
||||
|
||||
mqttEnabled = true;
|
||||
ws.setMqttEnabled(true);
|
||||
topic = String(mqttConfig.publishTopic);
|
||||
|
||||
if(mqttHandler != NULL) {
|
||||
if(mqttHandler != NULL && mqttHandler->getFormat() != mqttConfig.payloadFormat) {
|
||||
delete mqttHandler;
|
||||
mqttHandler = NULL;
|
||||
}
|
||||
|
||||
switch(mqttConfig.payloadFormat) {
|
||||
case 0:
|
||||
mqttHandler = new JsonMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.clientId, mqttConfig.publishTopic, &hw);
|
||||
break;
|
||||
case 1:
|
||||
case 2:
|
||||
mqttHandler = new RawMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.publishTopic, mqttConfig.payloadFormat == 2);
|
||||
break;
|
||||
case 3:
|
||||
DomoticzConfig domo;
|
||||
config.getDomoticzConfig(domo);
|
||||
mqttHandler = new DomoticzMqttHandler(mqtt, (char*) commonBuffer, domo);
|
||||
break;
|
||||
case 4:
|
||||
HomeAssistantConfig haconf;
|
||||
config.getHomeAssistantConfig(haconf);
|
||||
mqttHandler = new HomeAssistantMqttHandler(mqtt, (char*) commonBuffer, mqttConfig.clientId, mqttConfig.publishTopic, sysConfig.boardType, haconf, &hw);
|
||||
break;
|
||||
}
|
||||
|
||||
time_t epoch = time(nullptr);
|
||||
if(mqttConfig.ssl) {
|
||||
if(epoch < FirmwareVersion::BuildEpoch) {
|
||||
debugI_P(PSTR("NTP not ready for MQTT SSL"));
|
||||
return;
|
||||
}
|
||||
debugI_P(PSTR("MQTT SSL is configured (%dkb free heap)"), ESP.getFreeHeap());
|
||||
if(mqttSecureClient == NULL) {
|
||||
mqttSecureClient = new WiFiClientSecure();
|
||||
#if defined(ESP8266)
|
||||
mqttSecureClient->setBufferSizes(512, 512);
|
||||
debugD_P(PSTR("ESP8266 firmware does not have enough memory..."));
|
||||
return;
|
||||
#endif
|
||||
|
||||
if(LittleFS.begin()) {
|
||||
File file;
|
||||
|
||||
if(LittleFS.exists(FILE_MQTT_CA)) {
|
||||
debugI_P(PSTR("Found MQTT CA file (%dkb free heap)"), ESP.getFreeHeap());
|
||||
file = LittleFS.open(FILE_MQTT_CA, (char*) "r");
|
||||
#if defined(ESP8266)
|
||||
BearSSL::X509List *serverTrustedCA = new BearSSL::X509List(file);
|
||||
mqttSecureClient->setTrustAnchors(serverTrustedCA);
|
||||
#elif defined(ESP32)
|
||||
if(mqttSecureClient->loadCACert(file, file.size())) {
|
||||
debugI_P(PSTR("CA accepted"));
|
||||
} else {
|
||||
debugW_P(PSTR("CA was rejected"));
|
||||
delete mqttSecureClient;
|
||||
mqttSecureClient = NULL;
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
file.close();
|
||||
|
||||
if(LittleFS.exists(FILE_MQTT_CERT) && LittleFS.exists(FILE_MQTT_KEY)) {
|
||||
#if defined(ESP8266)
|
||||
debugI_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap());
|
||||
file = LittleFS.open(FILE_MQTT_CERT, (char*) "r");
|
||||
BearSSL::X509List *serverCertList = new BearSSL::X509List(file);
|
||||
file.close();
|
||||
|
||||
debugI_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap());
|
||||
file = LittleFS.open(FILE_MQTT_KEY, (char*) "r");
|
||||
BearSSL::PrivateKey *serverPrivKey = new BearSSL::PrivateKey(file);
|
||||
file.close();
|
||||
|
||||
debugD_P(PSTR("Setting client certificates (%dkb free heap)"), ESP.getFreeHeap());
|
||||
mqttSecureClient->setClientRSACert(serverCertList, serverPrivKey);
|
||||
#elif defined(ESP32)
|
||||
debugI_P(PSTR("Found MQTT certificate file (%dkb free heap)"), ESP.getFreeHeap());
|
||||
file = LittleFS.open(FILE_MQTT_CERT, (char*) "r");
|
||||
mqttSecureClient->loadCertificate(file, file.size());
|
||||
file.close();
|
||||
|
||||
debugI_P(PSTR("Found MQTT key file (%dkb free heap)"), ESP.getFreeHeap());
|
||||
file = LittleFS.open(FILE_MQTT_KEY, (char*) "r");
|
||||
mqttSecureClient->loadPrivateKey(file, file.size());
|
||||
file.close();
|
||||
#endif
|
||||
}
|
||||
} else {
|
||||
debugI_P(PSTR("No CA, disabling certificate validation"));
|
||||
mqttSecureClient->setInsecure();
|
||||
}
|
||||
mqttClient = mqttSecureClient;
|
||||
|
||||
LittleFS.end();
|
||||
debugD_P(PSTR("MQTT SSL setup complete (%dkb free heap)"), ESP.getFreeHeap());
|
||||
}
|
||||
if(mqttHandler == NULL) {
|
||||
switch(mqttConfig.payloadFormat) {
|
||||
case 0:
|
||||
mqttHandler = new JsonMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, &hw);
|
||||
break;
|
||||
case 1:
|
||||
case 2:
|
||||
mqttHandler = new RawMqttHandler(mqttConfig, &Debug, (char*) commonBuffer);
|
||||
break;
|
||||
case 3:
|
||||
DomoticzConfig domo;
|
||||
config.getDomoticzConfig(domo);
|
||||
mqttHandler = new DomoticzMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, domo);
|
||||
break;
|
||||
case 4:
|
||||
HomeAssistantConfig haconf;
|
||||
config.getHomeAssistantConfig(haconf);
|
||||
mqttHandler = new HomeAssistantMqttHandler(mqttConfig, &Debug, (char*) commonBuffer, sysConfig.boardType, haconf, &hw);
|
||||
break;
|
||||
case 255:
|
||||
mqttHandler = new PassthroughMqttHandler(mqttConfig, &Debug, (char*) commonBuffer);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(mqttClient == NULL) {
|
||||
debugI_P(PSTR("No SSL, using client without SSL support"));
|
||||
mqttClient = new WiFiClient();
|
||||
ws.setMqttHandler(mqttHandler);
|
||||
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->connect();
|
||||
mqttHandler->publishSystem(&hw, eapi, &ea);
|
||||
}
|
||||
|
||||
if(Debug.isActive(RemoteDebug::INFO)) {
|
||||
debugI_P(PSTR("Connecting to MQTT %s:%d"), mqttConfig.host, mqttConfig.port);
|
||||
}
|
||||
|
||||
mqtt->begin(mqttConfig.host, mqttConfig.port, *mqttClient);
|
||||
|
||||
#if defined(ESP8266)
|
||||
if(mqttSecureClient) {
|
||||
time_t epoch = time(nullptr);
|
||||
debugD_P(PSTR("Setting NTP time %lu for secure MQTT connection"), epoch);
|
||||
mqttSecureClient->setX509Time(epoch);
|
||||
}
|
||||
#endif
|
||||
|
||||
// Connect to a unsecure or secure MQTT server
|
||||
if ((strlen(mqttConfig.username) == 0 && mqtt->connect(mqttConfig.clientId)) ||
|
||||
(strlen(mqttConfig.username) > 0 && mqtt->connect(mqttConfig.clientId, mqttConfig.username, mqttConfig.password))) {
|
||||
if (Debug.isActive(RemoteDebug::INFO)) debugI_P(PSTR("Successfully connected to MQTT!"));
|
||||
|
||||
if(mqttHandler != NULL) {
|
||||
mqttHandler->publishSystem(&hw, eapi, &ea);
|
||||
}
|
||||
|
||||
// Subscribe to the chosen MQTT topic, if set in configuration
|
||||
if (strlen(mqttConfig.subscribeTopic) > 0) {
|
||||
mqtt->onMessage(mqttMessageReceived);
|
||||
mqtt->subscribe(String(mqttConfig.subscribeTopic) + "/#");
|
||||
debugI_P(PSTR(" Subscribing to [%s]\n"), mqttConfig.subscribeTopic);
|
||||
}
|
||||
} else {
|
||||
if (Debug.isActive(RemoteDebug::ERROR)) {
|
||||
debugE_P(PSTR("Failed to connect to MQTT: %d"), mqtt->lastError());
|
||||
#if defined(ESP8266)
|
||||
if(mqttSecureClient) {
|
||||
mqttSecureClient->getLastSSLError((char*) commonBuffer, BUF_SIZE_COMMON);
|
||||
Debug.println((char*) commonBuffer);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
yield();
|
||||
}
|
||||
|
||||
void configFileParse() {
|
||||
|
||||
28
src/PassthroughMqttHandler.cpp
Normal file
28
src/PassthroughMqttHandler.cpp
Normal file
@@ -0,0 +1,28 @@
|
||||
#include "PassthroughMqttHandler.h"
|
||||
|
||||
bool PassthroughMqttHandler::publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PassthroughMqttHandler::publishTemperatures(AmsConfiguration*, HwTools*) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PassthroughMqttHandler::publishPrices(EntsoeApi*) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PassthroughMqttHandler::publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PassthroughMqttHandler::publishRaw(String data) {
|
||||
bool ret = mqtt.publish(mqttConfig.publishTopic, data);
|
||||
loop();
|
||||
delay(10);
|
||||
return ret;
|
||||
}
|
||||
|
||||
uint8_t PassthroughMqttHandler::getFormat() {
|
||||
return 255;
|
||||
}
|
||||
17
src/PassthroughMqttHandler.h
Normal file
17
src/PassthroughMqttHandler.h
Normal file
@@ -0,0 +1,17 @@
|
||||
#ifndef _PASSTHROUGHMQTTHANDLER_H
|
||||
#define _PASSTHROUGHMQTTHANDLER_H
|
||||
|
||||
#include "AmsMqttHandler.h"
|
||||
|
||||
class PassthroughMqttHandler : public AmsMqttHandler {
|
||||
public:
|
||||
PassthroughMqttHandler(MqttConfig& mqttConfig, RemoteDebug* debugger, char* buf) : AmsMqttHandler(mqttConfig, debugger, buf) {};
|
||||
bool publish(AmsData* data, AmsData* previousState, EnergyAccounting* ea, EntsoeApi* eapi);
|
||||
bool publishTemperatures(AmsConfiguration*, HwTools*);
|
||||
bool publishPrices(EntsoeApi*);
|
||||
bool publishSystem(HwTools* hw, EntsoeApi* eapi, EnergyAccounting* ea);
|
||||
bool publishRaw(String data);
|
||||
|
||||
uint8_t getFormat();
|
||||
};
|
||||
#endif
|
||||
Reference in New Issue
Block a user