Improve read terminal extended ID

This commit is contained in:
Andrew Kay
2023-01-04 18:26:40 -06:00
parent c93b14ed1c
commit 0f09a5a1cf
2 changed files with 42 additions and 22 deletions

View File

@@ -7,9 +7,12 @@ import time
import logging
from more_itertools import chunked
from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \
ProtocolError
TerminalType, LoadAddressCounterLo, LoadSecondaryControl, \
SecondaryControl, ProtocolError
from coax.multiplexer import PORT_MAP_3299
from .interface import ExecuteError
logger = logging.getLogger(__name__)
class Device:
@@ -76,26 +79,28 @@ def format_address(interface, device_address):
def get_ids(interface, device_address, extended_id_retry_attempts=3):
terminal_id = None
extended_id = None
try:
terminal_id = interface.execute(address_commands(device_address, ReadTerminalId()))
except ProtocolError as error:
logger.warning(f'READ_TERMINAL_ID protocol error: {error}')
logger.warning(f'READ_TERMINAL_ID error: {error}')
# Retry the READ_EXTENDED_ID command as it appears to fail frequently on the
# first request - unlike the READ_TERMINAL_ID command,
extended_id = None
for attempt in range(extended_id_retry_attempts):
if terminal_id is not None and terminal_id.type != TerminalType.DFT:
# The READ_EXTENDED_ID command behaves similarly to the READ_MULTIPLE command and
# will terminate when the two low order bits of the address counter are zero. In
# order to read the entire 4 bytes of the extended ID reliably, we need to set
# the secondary control register to disable "big read" and set the address counter
# accordingly.
#
# The address counter will be reset later during device setup.
commands = [LoadSecondaryControl(SecondaryControl(big=False)), LoadAddressCounterLo(0), ReadExtendedId()]
try:
extended_id = interface.execute(address_commands(device_address, ReadExtendedId()))
break
except ProtocolError as error:
logger.warning(f'READ_EXTENDED_ID protocol error: {error}')
time.sleep(0.1)
extended_id = interface.execute(address_commands(device_address, commands))[-1]
except ExecuteError as error:
logger.warning(f'READ_EXTENDED_ID error: {error}')
return (terminal_id, extended_id.hex() if extended_id is not None else None)

View File

@@ -2,7 +2,7 @@ import unittest
from unittest.mock import Mock, patch
from logging import Logger
from coax import TerminalType, Feature, ReadAddressCounterHi, ReadAddressCounterLo, ReadTerminalId, ReadExtendedId, ReadFeatureId, ProtocolError
from coax import TerminalType, Feature, ReadAddressCounterHi, ReadAddressCounterLo, ReadTerminalId, ReadExtendedId, ReadFeatureId, ProtocolError, LoadAddressCounterLo, LoadSecondaryControl
from coax.protocol import TerminalId
import context
@@ -60,6 +60,19 @@ class GetIdsTestCase(unittest.TestCase):
self.addCleanup(patch.stopall)
def test_dft(self):
# Arrange
self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b00000001))]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.DFT)
self.assertIsNone(extended_id)
self.interface.assert_command_not_executed(None, ReadExtendedId)
def test_no_extended_id(self):
# Arrange
self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b11110100))]
@@ -73,6 +86,8 @@ class GetIdsTestCase(unittest.TestCase):
self.assertEqual(terminal_id.keyboard, 15)
self.assertIsNone(extended_id)
self.interface.assert_command_executed(None, ReadExtendedId)
def test_extended_id(self):
# Arrange
self.interface.mock_responses = [
@@ -89,25 +104,25 @@ class GetIdsTestCase(unittest.TestCase):
self.assertEqual(terminal_id.keyboard, 15)
self.assertEqual(extended_id, '01020304')
def test_extended_id_second_attempt(self):
self.interface.assert_command_executed(None, LoadSecondaryControl, lambda command: command.control.big == False)
self.interface.assert_command_executed(None, LoadAddressCounterLo, lambda command: command.address == 0)
def test_terminal_id_error(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, Mock(side_effect=[ProtocolError, bytes.fromhex('01 02 03 04')]))
(None, ReadTerminalId, None, Mock(side_effect=ProtocolError))
]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertEqual(extended_id, '01020304')
self.assertIsNone(terminal_id)
self.assertIsNone(extended_id)
self.logger.warning.assert_called()
def test_extended_id_failed(self):
def test_extended_id_error(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),