mirror of
https://github.com/lowobservable/coax.git
synced 2026-01-19 01:47:35 +00:00
Support new interface reset response
This commit is contained in:
parent
772d4d3538
commit
f4d8c01024
@ -19,6 +19,9 @@ class SerialInterface(Interface):
|
||||
|
||||
self.slip_serial = SlipSerial(self.serial)
|
||||
|
||||
self.legacy_firmware_detected = None
|
||||
self.legacy_firmware_version = None
|
||||
|
||||
def reset(self):
|
||||
original_serial_timeout = self.serial.timeout
|
||||
|
||||
@ -38,13 +41,17 @@ class SerialInterface(Interface):
|
||||
if message[0] != 0x01:
|
||||
raise _convert_error(message)
|
||||
|
||||
if len(message) != 4:
|
||||
if message[1:] == b'\x32\x70':
|
||||
self.legacy_firmware_detected = False
|
||||
self.legacy_firmware_version = None
|
||||
elif len(message) == 4:
|
||||
(major, minor, patch) = struct.unpack('BBB', message[1:])
|
||||
|
||||
self.legacy_firmware_detected = True
|
||||
self.legacy_firmware_version = '{}.{}.{}'.format(major, minor, patch)
|
||||
else:
|
||||
raise InterfaceError(f'Invalid reset response: {message}')
|
||||
|
||||
(major, minor, patch) = struct.unpack('BBB', message[1:])
|
||||
|
||||
return '{}.{}.{}'.format(major, minor, patch)
|
||||
|
||||
def transmit_receive(self, transmit_words, transmit_repeat_count=None,
|
||||
transmit_repeat_offset=1, receive_length=None,
|
||||
receive_timeout=None):
|
||||
|
||||
@ -7,6 +7,7 @@ with create_serial() as serial:
|
||||
|
||||
print('Resetting interface...')
|
||||
|
||||
version = interface.reset()
|
||||
interface.reset()
|
||||
|
||||
print(f'Firmware version is {version}')
|
||||
if interface.legacy_firmware_detected:
|
||||
print(f'Firmware version is {interface.legacy_firmware_version}')
|
||||
|
||||
@ -28,9 +28,10 @@ def create_interface(serial, reset=True, poll_flush=True):
|
||||
if reset:
|
||||
print('Resetting interface...')
|
||||
|
||||
version = interface.reset()
|
||||
interface.reset()
|
||||
|
||||
print(f'Firmware version is {version}')
|
||||
if interface.legacy_firmware_detected:
|
||||
print(f'Firmware version is {interface.legacy_firmware_version}')
|
||||
|
||||
if poll_flush:
|
||||
print('POLLing...')
|
||||
|
||||
@ -15,7 +15,7 @@ class SerialInterfaceResetTestCase(unittest.TestCase):
|
||||
self.interface = SerialInterface(self.serial)
|
||||
|
||||
self.interface._write_message = Mock()
|
||||
self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03'))
|
||||
self.interface._read_message = Mock(return_value=bytes.fromhex('01 32 70'))
|
||||
|
||||
def test_message_is_sent(self):
|
||||
# Act
|
||||
@ -24,8 +24,24 @@ class SerialInterfaceResetTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.interface._write_message.assert_called_with(bytes.fromhex('01'))
|
||||
|
||||
def test_version_is_formatted_correctly(self):
|
||||
self.assertEqual(self.interface.reset(), '1.2.3')
|
||||
def test_non_legacy_response_is_handled_correctly(self):
|
||||
# Act
|
||||
self.interface.reset()
|
||||
|
||||
# Assert
|
||||
self.assertFalse(self.interface.legacy_firmware_detected)
|
||||
self.assertIsNone(self.interface.legacy_firmware_version)
|
||||
|
||||
def test_legacy_response_is_handled_correctly(self):
|
||||
# Arrange
|
||||
self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03'))
|
||||
|
||||
# Act
|
||||
self.interface.reset()
|
||||
|
||||
# Assert
|
||||
self.assertTrue(self.interface.legacy_firmware_detected)
|
||||
self.assertEqual(self.interface.legacy_firmware_version, '1.2.3')
|
||||
|
||||
def test_timeout_is_restored_after_reset(self):
|
||||
# Arrange
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user