mirror of
https://github.com/lowobservable/coax.git
synced 2026-05-02 06:26:15 +00:00
Add 3299 protocol support to interface2
This commit is contained in:
@@ -11,7 +11,7 @@ from contextlib import contextmanager
|
||||
from serial import Serial
|
||||
from sliplib import SlipWrapper, ProtocolError
|
||||
|
||||
from .interface import Interface, FrameFormat
|
||||
from .interface import Interface, InterfaceFeature, FrameFormat
|
||||
from .protocol import pack_data_word
|
||||
from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout
|
||||
|
||||
@@ -22,6 +22,8 @@ class SerialInterface(Interface):
|
||||
if serial is None:
|
||||
raise ValueError('Serial port is required')
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.serial = serial
|
||||
|
||||
self.slip_serial = SlipSerial(self.serial)
|
||||
@@ -60,6 +62,13 @@ class SerialInterface(Interface):
|
||||
else:
|
||||
raise InterfaceError(f'Invalid reset response: {message}')
|
||||
|
||||
# Query features, if this is not a legacy firmware.
|
||||
if not self.legacy_firmware_detected:
|
||||
try:
|
||||
self.features = self._get_features()
|
||||
except InterfaceError:
|
||||
pass
|
||||
|
||||
def enter_dfu_mode(self):
|
||||
"""Enter device firmware upgrade mode."""
|
||||
message = bytes([0xf2])
|
||||
@@ -71,18 +80,35 @@ class SerialInterface(Interface):
|
||||
if message[0] != 0x01:
|
||||
raise _convert_error(message)
|
||||
|
||||
def _transmit_receive(self, outbound_frames, response_lengths, timeout):
|
||||
if any(address is not None for (address, _) in outbound_frames):
|
||||
raise NotImplementedError('Interface does not support 3299 protocol')
|
||||
def _get_features(self):
|
||||
"""Get interface features."""
|
||||
message = bytes([0xf0, 0x07])
|
||||
|
||||
self._write_message(message)
|
||||
|
||||
message = self._read_message()
|
||||
|
||||
if message[0] != 0x01:
|
||||
return _convert_error(message)
|
||||
|
||||
known_feature_values = {feature.value for feature in InterfaceFeature}
|
||||
|
||||
features = {InterfaceFeature(value) for value in message[1:] if value in known_feature_values}
|
||||
|
||||
return features
|
||||
|
||||
def _transmit_receive(self, outbound_frames, response_lengths, timeout):
|
||||
if len(response_lengths) != len(outbound_frames):
|
||||
raise ValueError('Response lengths length must equal outbound frames length')
|
||||
|
||||
if any(address is not None for (address, _) in outbound_frames) and InterfaceFeature.PROTOCOL_3299 not in self.features:
|
||||
raise NotImplementedError('Interface does not support 3299 protocol')
|
||||
|
||||
# Pack all messages before sending.
|
||||
timeout_milliseconds = self._calculate_timeout_milliseconds(timeout)
|
||||
|
||||
messages = [_pack_transmit_receive_message(frame, response_length, timeout_milliseconds)
|
||||
for ((_, frame), response_length) in zip(outbound_frames, response_lengths)]
|
||||
messages = [_pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds)
|
||||
for ((address, frame), response_length) in zip(outbound_frames, response_lengths)]
|
||||
|
||||
responses = []
|
||||
|
||||
@@ -158,7 +184,7 @@ def open_serial_interface(serial_port, reset=True):
|
||||
|
||||
yield interface
|
||||
|
||||
def _pack_transmit_receive_message(frame, response_length, timeout_milliseconds):
|
||||
def _pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds):
|
||||
message = bytes([0x06])
|
||||
|
||||
repeat_count = 0
|
||||
@@ -197,6 +223,15 @@ def _pack_transmit_receive_message(frame, response_length, timeout_milliseconds)
|
||||
for byte in frame[1]:
|
||||
bytes_ += struct.pack('<H', pack_data_word(byte))
|
||||
|
||||
if address is not None:
|
||||
if address < 0 or address > 63:
|
||||
raise ValueError('Address must be between 0 and 63')
|
||||
|
||||
if repeat_count > 0:
|
||||
repeat_offset += 1
|
||||
|
||||
bytes_ = struct.pack('<H', 0x8000 | address) + bytes_
|
||||
|
||||
message += struct.pack('>H', (repeat_offset << 15) | repeat_count)
|
||||
message += bytes_
|
||||
message += struct.pack('>HH', response_length, timeout_milliseconds)
|
||||
|
||||
Reference in New Issue
Block a user