From 14e90b9323693e32f1dffd81dfaf04ea8d2c11bb Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Wed, 26 Jun 2019 23:04:23 -0500 Subject: [PATCH] More flexible _execute_read_command --- pycoax/coax/protocol.py | 31 ++++++++++++++++--------------- pycoax/tests/test_protocol.py | 16 +++++++++++++++- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/pycoax/coax/protocol.py b/pycoax/coax/protocol.py index a569fc6..7c726d0 100644 --- a/pycoax/coax/protocol.py +++ b/pycoax/coax/protocol.py @@ -87,18 +87,14 @@ class TerminalId: def poll(interface, **kwargs): """Execute a POLL command.""" - command_word = _pack_command_word(Command.POLL) + response = _execute_read_command(interface, Command.POLL, allow_trta_response=True, + unpack_data_words=False, **kwargs) - response = interface.execute(command_word, **kwargs) - - if len(response) != 1: - raise ProtocolError('Expected 1 word POLL response') + if response is None: + return None word = response[0] - if word == 0: - return None - if PollResponse.is_power_on_reset_complete(word): return PowerOnResetCompletePollResponse(word) @@ -193,27 +189,32 @@ def diagnostic_reset(interface): """Execute a DIAGNOSTIC_RESET command.""" raise NotImplementedError -def _execute_read_command(interface, command, response_length=1): +def _execute_read_command(interface, command, response_length=1, + allow_trta_response=False, trta_value=None, + unpack_data_words=True, **kwargs): """Execute a standard read command.""" command_word = _pack_command_word(command) - response_words = interface.execute(command_word, response_length=response_length) + response = interface.execute(command_word, response_length=response_length) - if len(response_words) != response_length: + if allow_trta_response and len(response) == 1 and response[0] == 0: + return trta_value + + if len(response) != response_length: raise ProtocolError(f'Expected {response_length} word {command.name} response') - return _unpack_data_words(response_words) + return _unpack_data_words(response) if unpack_data_words else response def _execute_write_command(interface, command, data=None): """Execute a standard write command.""" command_word = _pack_command_word(command) - response_words = interface.execute(command_word, data) + response = interface.execute(command_word, data) - if len(response_words) != 1: + if len(response) != 1: raise ProtocolError(f'Expected 1 word {command.name} response') - if response_words[0] != 0: + if response[0] != 0: raise ProtocolError('Expected TR/TA response') def _pack_command_word(command, address=0): diff --git a/pycoax/tests/test_protocol.py b/pycoax/tests/test_protocol.py index 1520f7d..7e6557f 100644 --- a/pycoax/tests/test_protocol.py +++ b/pycoax/tests/test_protocol.py @@ -36,6 +36,20 @@ class ExecuteReadCommandTestCase(unittest.TestCase): # Act and assert self.assertEqual(_execute_read_command(self.interface, Command.READ_TERMINAL_ID), bytes.fromhex('00')) + def test_allow_trta_response(self): + # Arrange + self.interface.execute = Mock(return_value=[0b0000000000]) + + # Act and assert + self.assertEqual(_execute_read_command(self.interface, Command.POLL, allow_trta_response=True, trta_value='TRTA'), 'TRTA') + + def test_disable_unpack_data_words(self): + # Arrange + self.interface.execute = Mock(return_value=[0b1111111110]) + + # Act and assert + self.assertEqual(_execute_read_command(self.interface, Command.POLL, unpack_data_words=False), [0b1111111110]) + def test_unexpected_response_length(self): # Arrange self.interface.execute = Mock(return_value=[]) @@ -63,7 +77,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase): with self.assertRaisesRegex(ProtocolError, 'Expected 1 word WRITE_DATA response'): _execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef')) - def test_not_tr_ta_response(self): + def test_not_trta_response(self): # Arrange self.interface.execute = Mock(return_value=[0b0000000010])