mirror of
https://github.com/UtilitechAS/amsreader-firmware.git
synced 2026-01-26 04:11:18 +00:00
Updated the MQTT process to make it more secure, updated the updater
This commit is contained in:
@@ -25,7 +25,7 @@ import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
DEFAULT_CHANNEL = "stable"
|
||||
DEFAULT_OUTPUT = Path("dist")
|
||||
@@ -108,6 +108,82 @@ def compute_md5(path: Path) -> str:
|
||||
return hash_md5.hexdigest()
|
||||
|
||||
|
||||
MQTT_FIELD_SPECS: Tuple[Tuple[str, str, type], ...] = (
|
||||
("host", "MQTT_DEFAULT_HOST", str),
|
||||
("port", "MQTT_DEFAULT_PORT", int),
|
||||
("username", "MQTT_DEFAULT_USERNAME", str),
|
||||
("password", "MQTT_DEFAULT_PASSWORD", str),
|
||||
("client_id", "MQTT_DEFAULT_CLIENT_ID", str),
|
||||
("publish_topic", "MQTT_DEFAULT_PUBLISH_TOPIC", str),
|
||||
("subscribe_topic", "MQTT_DEFAULT_SUBSCRIBE_TOPIC", str),
|
||||
("payload_format", "MQTT_DEFAULT_PAYLOAD_FORMAT", int),
|
||||
("ssl", "MQTT_DEFAULT_SSL", bool),
|
||||
("state_update", "MQTT_DEFAULT_STATE_UPDATE", bool),
|
||||
("state_update_interval", "MQTT_DEFAULT_STATE_UPDATE_INTERVAL", int),
|
||||
("timeout", "MQTT_DEFAULT_TIMEOUT", int),
|
||||
("keepalive", "MQTT_DEFAULT_KEEPALIVE", int),
|
||||
)
|
||||
|
||||
|
||||
def _parse_env_file(path: Path) -> Dict[str, str]:
|
||||
data: Dict[str, str] = {}
|
||||
if not path.exists():
|
||||
return data
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
for raw_line in handle:
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
key, sep, value = line.partition("=")
|
||||
if not sep:
|
||||
continue
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
|
||||
value = value[1:-1]
|
||||
data[key] = value
|
||||
return data
|
||||
|
||||
|
||||
def _to_bool(value: str) -> bool:
|
||||
truthy = {"1", "true", "yes", "on"}
|
||||
falsy = {"0", "false", "no", "off"}
|
||||
lowered = value.lower()
|
||||
if lowered in truthy:
|
||||
return True
|
||||
if lowered in falsy:
|
||||
return False
|
||||
raise ValueError(f"Invalid boolean literal: {value}")
|
||||
|
||||
|
||||
def load_mqtt_defaults(project_dir: Path) -> Dict[str, Any]:
|
||||
env_path = project_dir / ".env"
|
||||
file_values = _parse_env_file(env_path)
|
||||
manifest_values: Dict[str, Any] = {}
|
||||
|
||||
for manifest_key, env_key, value_type in MQTT_FIELD_SPECS:
|
||||
raw_value = os.getenv(env_key)
|
||||
if raw_value is None:
|
||||
raw_value = file_values.get(env_key)
|
||||
if raw_value is None or raw_value == "":
|
||||
continue
|
||||
|
||||
try:
|
||||
if value_type is bool:
|
||||
converted: Any = _to_bool(raw_value)
|
||||
elif value_type is int:
|
||||
converted = int(raw_value, 0)
|
||||
else:
|
||||
converted = raw_value
|
||||
except ValueError as exc:
|
||||
print(f"WARN: Skipping MQTT field {env_key}: {exc}")
|
||||
continue
|
||||
|
||||
manifest_values[manifest_key] = converted
|
||||
|
||||
return manifest_values
|
||||
|
||||
|
||||
def package_environment(
|
||||
env: str,
|
||||
chip: str,
|
||||
@@ -116,6 +192,7 @@ def package_environment(
|
||||
version: str,
|
||||
output_dir: Path,
|
||||
published_at: str,
|
||||
mqtt_defaults: Optional[Dict[str, Any]] = None,
|
||||
) -> Optional[Dict[str, str]]:
|
||||
firmware_path = build_dir / env / "firmware.bin"
|
||||
if not firmware_path.exists():
|
||||
@@ -144,6 +221,8 @@ def package_environment(
|
||||
"published_at": published_at,
|
||||
"env": env,
|
||||
}
|
||||
if mqtt_defaults:
|
||||
manifest["mqtt"] = mqtt_defaults
|
||||
manifest_path.write_text(json.dumps(manifest, indent=2))
|
||||
|
||||
# Optional: bundle flashing zip if produced by scripts/mkzip.sh
|
||||
@@ -249,6 +328,11 @@ def main() -> None:
|
||||
else datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||
)
|
||||
|
||||
project_dir = Path.cwd()
|
||||
mqtt_defaults = load_mqtt_defaults(project_dir)
|
||||
if mqtt_defaults:
|
||||
print("Including MQTT defaults in manifest for release packaging")
|
||||
|
||||
summaries = []
|
||||
for env in envs:
|
||||
chip = ENV_TO_CHIP[env]
|
||||
@@ -260,6 +344,7 @@ def main() -> None:
|
||||
version=version,
|
||||
output_dir=args.output,
|
||||
published_at=published_at,
|
||||
mqtt_defaults=mqtt_defaults,
|
||||
)
|
||||
if summary:
|
||||
summaries.append(summary)
|
||||
|
||||
Reference in New Issue
Block a user