Restructured AMS data parsing

This commit is contained in:
Gunnar Skjold
2022-06-12 09:46:20 +02:00
parent 0dca85d67b
commit fca46a3f54
31 changed files with 946 additions and 913 deletions

View File

@@ -62,13 +62,12 @@ ADC_MODE(ADC_VCC);
#define BUF_SIZE_COMMON (2048)
#define BUF_SIZE_HAN (1024)
#include "ams/hdlc.h"
#include "MbusAssembler.h"
#include "GBTAssembler.h"
#include "IEC6205621.h"
#include "IEC6205675.h"
#include "ams/DataParsers.h"
uint8_t commonBuffer[BUF_SIZE_COMMON];
uint8_t hanBuffer[BUF_SIZE_HAN];
@@ -93,7 +92,6 @@ AmsMqttHandler* mqttHandler = NULL;
Stream *hanSerial;
SoftwareSerial *swSerial = NULL;
HDLCConfig* hc = NULL;
GpioConfig gpioConfig;
MeterConfig meterConfig;
@@ -107,6 +105,14 @@ EnergyAccounting ea(&Debug);
uint8_t wifiReconnectCount = 0;
HDLCParser *hdlcParser = NULL;
MBUSParser *mbusParser = NULL;
GBTParser *gbtParser = NULL;
GCMParser *gcmParser = NULL;
LLCParser *llcParser = NULL;
DLMSParser *dlmsParser = NULL;
DSMRParser *dsmrParser = NULL;
void setup() {
WiFiConfig wifi;
Serial.begin(115200);
@@ -554,8 +560,8 @@ void loop() {
config.getMeterConfig(meterConfig);
setupHanPort(gpioConfig.hanPin, meterConfig.baud, meterConfig.parity, meterConfig.invert);
config.ackMeterChanged();
delete hc;
hc = NULL;
delete gcmParser;
gcmParser = NULL;
}
if(config.isEnergyAccountingChanged()) {
@@ -768,225 +774,83 @@ void swapWifiMode() {
}
int len = 0;
MbusAssembler* ma = NULL;
GBTAssembler* ga = NULL;
int currentMeterType = -1;
bool serialInit = false;
bool readHanPort() {
if(!hanSerial->available()) return false;
// Before autodetect starts, empty serial buffer to increase chance of getting first byte of a data transfer
if(currentMeterType == -1) {
// Before reading, empty serial buffer to increase chance of getting first byte of a data transfer
if(!serialInit) {
hanSerial->readBytes(hanBuffer, BUF_SIZE_HAN);
currentMeterType = 0; // Start autodetection
serialInit = true;
return false;
}
// Data type autodetect
if(currentMeterType == 0) {
uint8_t flag = hanSerial->read();
if(flag == 0x7E || flag == 0x68) {
debugD("HDLC or MBUS");
currentMeterType = 1;
} else if(flag == 0xDB) {
debugD("Encrypted DSMR");
hc = new HDLCConfig();
memcpy(hc->encryption_key, meterConfig.encryptionKey, 16);
memcpy(hc->authentication_key, meterConfig.authenticationKey, 16);
currentMeterType = 2;
} else if(flag == 0x2F) {
debugD("DSMR");
currentMeterType = 2;
} else {
currentMeterType = -1; // Unable to detect, reset to flush serial buffer
}
// Empty serial buffer before continuing
hanSerial->readBytes(hanBuffer, BUF_SIZE_HAN);
return false;
}
CosemDateTime timestamp = {0};
HDLCContext context = {0,0,0};
AmsData data;
if(currentMeterType == 1) { // DLMS
int pos = HDLC_FRAME_INCOMPLETE;
// For each byte received, check if we have a complete HDLC (or MBUS) frame we can handle
while(hanSerial->available() && pos == HDLC_FRAME_INCOMPLETE) {
hanBuffer[len++] = hanSerial->read();
pos = HDLC_validate((uint8_t *) hanBuffer, len, hc, &timestamp, &context);
}
if(len > 0) {
// If buffer was overflowed, reset
if(len >= BUF_SIZE_HAN) {
hanSerial->readBytes(hanBuffer, BUF_SIZE_HAN);
len = 0;
debugI("Buffer overflow, resetting");
return false;
}
// In case we get segmented MBUS frames, assemble before parsing
if(pos == MBUS_FRAME_INTERMEDIATE_SEGMENT) {
debugI("Intermediate segment");
if(ma == NULL) {
ma = new MbusAssembler();
}
if(ma->append((uint8_t *) hanBuffer, len) < 0) {
debugE("MBUS assembler failed");
len = 0;
return false;
}
if(Debug.isActive(RemoteDebug::VERBOSE)) {
debugD("Intermediate degment dump (%db):", len);
debugPrint(hanBuffer, 0, len);
}
len = 0;
return false;
} else if(pos == MBUS_FRAME_LAST_SEGMENT) {
debugI("Final segment");
if(Debug.isActive(RemoteDebug::VERBOSE)) {
debugD("Final segment dump (%db):", len);
debugPrint(hanBuffer, 0, len);
}
if(ma->append((uint8_t *) hanBuffer, len) >= 0) {
len = ma->write((uint8_t *) hanBuffer);
pos = HDLC_validate((uint8_t *) hanBuffer, len, hc, &timestamp, &context);
} else {
debugE("MBUS assembler failed");
len = 0;
return false;
}
}
// In case we get segmented HDLC frames (General Block Transfer), assemble before parsing
if(pos == HDLC_GBT_INTERMEDIATE) {
debugI("Intermediate block");
if(ga == NULL) {
ga = new GBTAssembler();
}
if(ga->append(&context, (uint8_t *) hanBuffer, len, &Debug) < 0) {
debugE("GBT assembler failed");
len = 0;
return false;
}
if(Debug.isActive(RemoteDebug::VERBOSE)) {
debugD("Intermediate block dump (%db):", len);
debugPrint(hanBuffer, 0, len);
}
len = 0;
return false;
} else if(pos == HDLC_GBT_LAST) {
debugI("Final block");
if(Debug.isActive(RemoteDebug::VERBOSE)) {
debugD("Final block dump (%db):", len);
debugPrint(hanBuffer, 0, len);
}
if(ga->append(&context, (uint8_t *) hanBuffer, len, &Debug) >= 0) {
len = ga->write((uint8_t *) hanBuffer);
pos = HDLC_validate((uint8_t *) hanBuffer, len, hc, &timestamp, &context);
} else {
debugE("GBT assembler failed");
len = 0;
return false;
}
}
// Encryption, but config was not initialized
if(pos == HDLC_ENCRYPTION_CONFIG_MISSING) {
hc = new HDLCConfig();
memcpy(hc->encryption_key, meterConfig.encryptionKey, 16);
memcpy(hc->authentication_key, meterConfig.authenticationKey, 16);
pos = HDLC_validate((uint8_t *) hanBuffer, len, hc, &timestamp, &context);
}
// Received frame was incomplete, return to loop and wait for more data
if(pos == HDLC_FRAME_INCOMPLETE) {
return false;
}
// Data is valid, clear the rest of the buffer to avoid tainted read
for(int i = len; i<BUF_SIZE_HAN; i++) {
hanBuffer[i] = 0x00;
}
if(Debug.isActive(RemoteDebug::VERBOSE)) {
debugW("APDU tag %02X", context.apdu);
debugW("APDU start %d", context.apduStart);
debugD("Frame dump (%db):", len);
debugPrint(hanBuffer, 0, len);
}
if(hc != NULL && Debug.isActive(RemoteDebug::VERBOSE)) {
debugD("System title:");
debugPrint(hc->system_title, 0, 8);
debugD("Initialization vector:");
debugPrint(hc->initialization_vector, 0, 12);
debugD("Additional authenticated data:");
debugPrint(hc->additional_authenticated_data, 0, 17);
debugD("Authentication tag:");
debugPrint(hc->authentication_tag, 0, 12);
}
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(hanBuffer, len));
}
len = 0; // Reset length for next frame
if(pos > 0) {
// Parse valid data
debugD("Valid data, start at byte %d", pos);
// TODO: Split IEC6205675 into DataParserKaifa and DataParserObis. This way we can add other means of parsing, for those other proprietary formats
data = IEC6205675(((char *) (hanBuffer)) + pos, meterState.getMeterType(), &meterConfig, timestamp, hc);
} else {
printHanReadError(pos);
return false;
}
} else {
return false;
}
} else if(currentMeterType == 2) { // DSMR
int pos = HDLC_FRAME_INCOMPLETE;
if(hc != NULL) {
while(hanSerial->available() && pos == HDLC_FRAME_INCOMPLETE) {
hanBuffer[len++] = hanSerial->read();
pos = mbus_decrypt((uint8_t *) hanBuffer, len, hc);
}
} else {
while(hanSerial->available()) {
hanBuffer[len++] = hanSerial->read();
}
if(len > 10) {
String end = String((char*) hanBuffer+len-5);
if(end.startsWith("!")) pos = 0;
while(hanSerial->available()) hanSerial->read();
}
}
if(len == 0) return false;
if(pos == HDLC_FRAME_INCOMPLETE) return false;
DataParserContext ctx = {0};
int pos = DATA_PARSE_INCOMPLETE;
// For each byte received, check if we have a complete frame we can handle
while(hanSerial->available() && pos == DATA_PARSE_INCOMPLETE) {
// If buffer was overflowed, reset
if(len >= BUF_SIZE_HAN) {
hanSerial->readBytes(hanBuffer, BUF_SIZE_HAN);
len = 0;
debugI("Buffer overflow, resetting");
return false;
}
if(pos < 0) {
printHanReadError(pos);
while(hanSerial->available()) hanSerial->read();
len = 0;
return false;
}
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), (char*) hanBuffer);
}
len = 0;
data = IEC6205621(((char *) (hanBuffer)) + pos);
if(data.getListType() == 0) {
currentMeterType = 0; // Did not receive valid data, go bach to autodetect
return false;
} else {
if(Debug.isActive(RemoteDebug::DEBUG)) {
debugD("Frame dump: %d", strlen((char*) (hanBuffer+pos)));
debugD("%s", hanBuffer+pos);
hanBuffer[len++] = hanSerial->read();
ctx.length = len;
pos = unwrapData((uint8_t *) hanBuffer, ctx);
if(pos >= 0) {
if(ctx.type == DATA_TAG_DLMS) {
debugV("Received valid DLMS at %d", pos);
} else if(ctx.type == DATA_TAG_DSMR) {
debugV("Received valid DSMR at %d", pos);
} else {
// TODO: Move this so that payload is sent to MQTT
debugE("Unknown tag %02X at pos %d", ctx.type, pos);
len = 0;
return false;
}
}
for(int i = len; i<BUF_SIZE_HAN; i++) hanBuffer[i] = 0x00;
}
if(pos == DATA_PARSE_INCOMPLETE) {
return false;
}
if(pos == DATA_PARSE_INTERMEDIATE_SEGMENT) {
len = 0;
return false;
} else if(pos < 0) {
printHanReadError(pos);
len += hanSerial->readBytes(hanBuffer+len, BUF_SIZE_HAN-len);
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(hanBuffer+pos, len));
}
while(hanSerial->available()) hanSerial->read(); // Make sure it is all empty, in case we overflowed buffer above
len = 0;
return false;
}
// Data is valid, clear the rest of the buffer to avoid tainted parsing
for(int i = pos+ctx.length; i<BUF_SIZE_HAN; i++) {
hanBuffer[i] = 0x00;
}
AmsData data;
if(ctx.type == DATA_TAG_DLMS) {
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(hanBuffer+pos, ctx.length));
}
debugV("Using application data:");
debugPrint(hanBuffer+pos, 0, ctx.length);
// TODO: Split IEC6205675 into DataParserKaifa and DataParserObis. This way we can add other means of parsing, for those other proprietary formats
data = IEC6205675(((char *) (hanBuffer)) + pos, meterState.getMeterType(), &meterConfig, ctx);
} else if(ctx.type == DATA_TAG_DSMR) {
data = IEC6205621(((char *) (hanBuffer)) + pos);
}
len = 0;
if(data.getListType() > 0) {
if(!hw.ledBlink(LED_GREEN, 1))
@@ -1048,43 +912,35 @@ bool readHanPort() {
void printHanReadError(int pos) {
if(Debug.isActive(RemoteDebug::WARNING)) {
switch(pos) {
case HDLC_BOUNDRY_FLAG_MISSING:
case DATA_PARSE_BOUNDRY_FLAG_MISSING:
debugW("Boundry flag missing");
break;
case HDLC_HCS_ERROR:
case DATA_PARSE_HEADER_CHECKSUM_ERROR:
debugW("Header checksum error");
break;
case HDLC_FCS_ERROR:
case DATA_PARSE_FOOTER_CHECKSUM_ERROR:
debugW("Frame checksum error");
break;
case HDLC_FRAME_INCOMPLETE:
case DATA_PARSE_INCOMPLETE:
debugW("Received frame is incomplete");
break;
case HDLC_ENCRYPTION_CONFIG_MISSING:
debugI("Encryption configuration requested, initializing");
break;
case HDLC_ENCRYPTION_AUTH_FAILED:
case GCM_AUTH_FAILED:
debugW("Decrypt authentication failed");
break;
case HDLC_ENCRYPTION_KEY_FAILED:
case GCM_ENCRYPTION_KEY_FAILED:
debugW("Setting decryption key failed");
break;
case HDLC_ENCRYPTION_DECRYPT_FAILED:
case GCM_DECRYPT_FAILED:
debugW("Decryption failed");
break;
case MBUS_FRAME_LENGTH_NOT_EQUAL:
debugW("Frame length mismatch");
break;
case MBUS_FRAME_INTERMEDIATE_SEGMENT:
case MBUS_FRAME_LAST_SEGMENT:
debugW("Partial frame dropped");
case DATA_PARSE_INTERMEDIATE_SEGMENT:
debugI("Intermediate segment received");
break;
case HDLC_TIMESTAMP_UNKNOWN:
debugW("Frame timestamp is not correctly formatted");
break;
case HDLC_UNKNOWN_DATA:
case DATA_PARSE_UNKNOWN_DATA:
debugW("Unknown data format %02X", hanBuffer[0]);
currentMeterType = 0; // Did not receive valid data, go back to autodetect
break;
default:
debugW("Unspecified error while reading data: %d", pos);
@@ -1239,6 +1095,128 @@ void WiFi_connect() {
}
}
void mqttMessageReceived(String &topic, String &payload) {
debugI("Received message for topic %s", topic.c_str() );
if(meterConfig.source == METER_SOURCE_MQTT) {
DataParserContext ctx = {payload.length()/2};
fromHex(hanBuffer, payload, ctx.length);
uint16_t pos = unwrapData(hanBuffer, ctx);
// TODO: Run through DLMS/DMSR parser and apply AmsData
}
}
int16_t unwrapData(uint8_t *buf, DataParserContext &context) {
int16_t ret;
bool doRet = false;
uint16_t end = BUF_SIZE_HAN;
uint8_t tag = (*buf);
uint8_t lastTag = DATA_TAG_NONE;
while(tag != DATA_TAG_NONE) {
int16_t curLen = context.length;
int8_t res = 0;
switch(tag) {
case DATA_TAG_HDLC:
if(hdlcParser == NULL) hdlcParser = new HDLCParser();
res = hdlcParser->parse(buf, context);
break;
case DATA_TAG_MBUS:
if(mbusParser == NULL) mbusParser = new MBUSParser();
res = mbusParser->parse(buf, context);
break;
case DATA_TAG_GBT:
if(gbtParser == NULL) gbtParser = new GBTParser();
res = gbtParser->parse(buf, context);
break;
case DATA_TAG_GCM:
if(gcmParser == NULL) gcmParser = new GCMParser(meterConfig.encryptionKey, meterConfig.authenticationKey);
res = gcmParser->parse(buf, context);
break;
case DATA_TAG_LLC:
if(llcParser == NULL) llcParser = new LLCParser();
res = llcParser->parse(buf, context);
break;
case DATA_TAG_DLMS:
if(dlmsParser == NULL) dlmsParser = new DLMSParser();
res = dlmsParser->parse(buf, context);
if(res >= 0) doRet = true;
break;
case DATA_TAG_DSMR:
if(dsmrParser == NULL) dsmrParser = new DSMRParser();
res = dsmrParser->parse(buf, context, lastTag != DATA_TAG_NONE);
if(res >= 0) doRet = true;
break;
default:
debugE("Ended up in default case while unwrapping...");
return DATA_PARSE_UNKNOWN_DATA;
}
lastTag = tag;
if(res == DATA_PARSE_INCOMPLETE) {
return res;
}
if(context.length > end) return false;
if(Debug.isActive(RemoteDebug::VERBOSE)) {
switch(tag) {
case DATA_TAG_HDLC:
debugV("HDLC frame:");
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(buf, curLen));
}
break;
case DATA_TAG_MBUS:
debugV("MBUS frame:");
// If MQTT bytestream payload is selected (mqttHandler == NULL), send the payload to MQTT
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), toHex(buf, curLen));
}
break;
case DATA_TAG_GBT:
debugV("GBT frame:");
break;
case DATA_TAG_GCM:
debugV("GCM frame:");
break;
case DATA_TAG_LLC:
debugV("LLC frame:");
break;
case DATA_TAG_DLMS:
debugV("DLMS frame:");
break;
case DATA_TAG_DSMR:
debugV("DSMR frame:");
if(mqttEnabled && mqtt != NULL && mqttHandler == NULL) {
mqtt->publish(topic.c_str(), (char*) buf);
}
break;
}
debugPrint(buf, 0, curLen);
}
if(res == DATA_PARSE_FINAL_SEGMENT) {
if(tag == DATA_TAG_MBUS) {
res = mbusParser->write(buf, context);
}
}
if(res < 0) {
return res;
}
buf += res;
end -= res;
ret += res;
// If we are ready to return, do that
if(doRet) {
context.type = tag;
return ret;
}
// Use start byte of new buffer position as tag for next round in loop
tag = (*buf);
}
debugE("Got to end of unwrap method...");
return DATA_PARSE_UNKNOWN_DATA;
}
unsigned long lastMqttRetry = -10000;
void MQTT_connect() {
MqttConfig mqttConfig;
@@ -1386,6 +1364,13 @@ void MQTT_connect() {
if(mqttHandler != NULL) {
mqttHandler->publishSystem(&hw);
}
// Subscribe to the chosen MQTT topic, if set in configuration
if (strlen(mqttConfig.subscribeTopic) > 0) {
mqtt->onMessage(mqttMessageReceived);
mqtt->subscribe(String(mqttConfig.subscribeTopic) + "/#");
debugI(" Subscribing to [%s]\r\n", mqttConfig.subscribeTopic);
}
} else {
if (Debug.isActive(RemoteDebug::ERROR)) {
debugE("Failed to connect to MQTT: %d", mqtt->lastError());