diff --git a/oec/device.py b/oec/device.py index f053cd7..6c65cf8 100644 --- a/oec/device.py +++ b/oec/device.py @@ -7,9 +7,12 @@ import time import logging from more_itertools import chunked from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \ - ProtocolError + TerminalType, LoadAddressCounterLo, LoadSecondaryControl, \ + SecondaryControl, ProtocolError from coax.multiplexer import PORT_MAP_3299 +from .interface import ExecuteError + logger = logging.getLogger(__name__) class Device: @@ -76,26 +79,28 @@ def format_address(interface, device_address): def get_ids(interface, device_address, extended_id_retry_attempts=3): terminal_id = None - extended_id = None try: terminal_id = interface.execute(address_commands(device_address, ReadTerminalId())) except ProtocolError as error: - logger.warning(f'READ_TERMINAL_ID protocol error: {error}') + logger.warning(f'READ_TERMINAL_ID error: {error}') - # Retry the READ_EXTENDED_ID command as it appears to fail frequently on the - # first request - unlike the READ_TERMINAL_ID command, extended_id = None - for attempt in range(extended_id_retry_attempts): + if terminal_id is not None and terminal_id.type != TerminalType.DFT: + # The READ_EXTENDED_ID command behaves similarly to the READ_MULTIPLE command and + # will terminate when the two low order bits of the address counter are zero. In + # order to read the entire 4 bytes of the extended ID reliably, we need to set + # the secondary control register to disable "big read" and set the address counter + # accordingly. + # + # The address counter will be reset later during device setup. + commands = [LoadSecondaryControl(SecondaryControl(big=False)), LoadAddressCounterLo(0), ReadExtendedId()] + try: - extended_id = interface.execute(address_commands(device_address, ReadExtendedId())) - - break - except ProtocolError as error: - logger.warning(f'READ_EXTENDED_ID protocol error: {error}') - - time.sleep(0.1) + extended_id = interface.execute(address_commands(device_address, commands))[-1] + except ExecuteError as error: + logger.warning(f'READ_EXTENDED_ID error: {error}') return (terminal_id, extended_id.hex() if extended_id is not None else None) diff --git a/tests/test_device.py b/tests/test_device.py index 0019396..5cd3bf2 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -2,7 +2,7 @@ import unittest from unittest.mock import Mock, patch from logging import Logger -from coax import TerminalType, Feature, ReadAddressCounterHi, ReadAddressCounterLo, ReadTerminalId, ReadExtendedId, ReadFeatureId, ProtocolError +from coax import TerminalType, Feature, ReadAddressCounterHi, ReadAddressCounterLo, ReadTerminalId, ReadExtendedId, ReadFeatureId, ProtocolError, LoadAddressCounterLo, LoadSecondaryControl from coax.protocol import TerminalId import context @@ -60,6 +60,19 @@ class GetIdsTestCase(unittest.TestCase): self.addCleanup(patch.stopall) + def test_dft(self): + # Arrange + self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b00000001))] + + # Act + (terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(terminal_id.type, TerminalType.DFT) + self.assertIsNone(extended_id) + + self.interface.assert_command_not_executed(None, ReadExtendedId) + def test_no_extended_id(self): # Arrange self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b11110100))] @@ -73,6 +86,8 @@ class GetIdsTestCase(unittest.TestCase): self.assertEqual(terminal_id.keyboard, 15) self.assertIsNone(extended_id) + self.interface.assert_command_executed(None, ReadExtendedId) + def test_extended_id(self): # Arrange self.interface.mock_responses = [ @@ -89,25 +104,25 @@ class GetIdsTestCase(unittest.TestCase): self.assertEqual(terminal_id.keyboard, 15) self.assertEqual(extended_id, '01020304') - def test_extended_id_second_attempt(self): + self.interface.assert_command_executed(None, LoadSecondaryControl, lambda command: command.control.big == False) + self.interface.assert_command_executed(None, LoadAddressCounterLo, lambda command: command.address == 0) + + def test_terminal_id_error(self): # Arrange self.interface.mock_responses = [ - (None, ReadTerminalId, None, TerminalId(0b11110100)), - (None, ReadExtendedId, None, Mock(side_effect=[ProtocolError, bytes.fromhex('01 02 03 04')])) + (None, ReadTerminalId, None, Mock(side_effect=ProtocolError)) ] # Act (terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None) # Assert - self.assertEqual(terminal_id.type, TerminalType.CUT) - self.assertEqual(terminal_id.model, 2) - self.assertEqual(terminal_id.keyboard, 15) - self.assertEqual(extended_id, '01020304') + self.assertIsNone(terminal_id) + self.assertIsNone(extended_id) self.logger.warning.assert_called() - def test_extended_id_failed(self): + def test_extended_id_error(self): # Arrange self.interface.mock_responses = [ (None, ReadTerminalId, None, TerminalId(0b11110100)),