From 543af3cbef48ed2b3dc65840f6f54cfcfae3939c Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Sat, 21 Dec 2019 09:57:29 -0600 Subject: [PATCH] Refactor execute to take command word --- pycoax/coax/protocol.py | 66 +++++++++++++++++++++++++---------- pycoax/tests/test_protocol.py | 63 ++++++++++++++++++++++++++------- 2 files changed, 99 insertions(+), 30 deletions(-) diff --git a/pycoax/coax/protocol.py b/pycoax/coax/protocol.py index 919760c..847b241 100644 --- a/pycoax/coax/protocol.py +++ b/pycoax/coax/protocol.py @@ -118,7 +118,9 @@ class TerminalId: def poll(interface, **kwargs): """Execute a POLL command.""" - response = _execute_read_command(interface, Command.POLL, allow_trta_response=True, + command_word = _pack_command_word(Command.POLL) + + response = _execute_read_command(interface, command_word, allow_trta_response=True, unpack_data_words=False, **kwargs) if response is None: @@ -136,32 +138,44 @@ def poll(interface, **kwargs): def poll_ack(interface, **kwargs): """Execute a POLL_ACK command.""" - _execute_write_command(interface, Command.POLL_ACK, **kwargs) + command_word = _pack_command_word(Command.POLL_ACK) + + _execute_write_command(interface, command_word, **kwargs) def read_status(interface, **kwargs): """Execute a READ_STATUS command.""" - response = _execute_read_command(interface, Command.READ_STATUS, **kwargs) + command_word = _pack_command_word(Command.READ_STATUS) + + response = _execute_read_command(interface, command_word, **kwargs) return Status(response[0]) def read_terminal_id(interface, **kwargs): """Execute a READ_TERMINAL_ID command.""" - response = _execute_read_command(interface, Command.READ_TERMINAL_ID, **kwargs) + command_word = _pack_command_word(Command.READ_TERMINAL_ID) + + response = _execute_read_command(interface, command_word, **kwargs) return TerminalId(response[0]) def read_extended_id(interface, **kwargs): """Execute a READ_EXTENDED_ID command.""" - return _execute_read_command(interface, Command.READ_EXTENDED_ID, 4, - allow_trta_response=True, **kwargs) + command_word = _pack_command_word(Command.READ_EXTENDED_ID) + + return _execute_read_command(interface, command_word, 4, allow_trta_response=True, + **kwargs) def read_address_counter_hi(interface, **kwargs): """Execute a READ_ADDRESS_COUNTER_HI command.""" - return _execute_read_command(interface, Command.READ_ADDRESS_COUNTER_HI, **kwargs)[0] + command_word = _pack_command_word(Command.READ_ADDRESS_COUNTER_HI) + + return _execute_read_command(interface, command_word, **kwargs)[0] def read_address_counter_lo(interface, **kwargs): """Execute a READ_ADDRESS_COUTER_LO command.""" - return _execute_read_command(interface, Command.READ_ADDRESS_COUNTER_LO, **kwargs)[0] + command_word = _pack_command_word(Command.READ_ADDRESS_COUNTER_LO) + + return _execute_read_command(interface, command_word, **kwargs)[0] def read_data(interface): """Execute a READ_DATA command.""" @@ -189,15 +203,21 @@ def load_mask(interface): def load_address_counter_hi(interface, address, **kwargs): """Execute a LOAD_ADDRESS_COUNTER_HI command.""" - _execute_write_command(interface, Command.LOAD_ADDRESS_COUNTER_HI, bytes([address]), **kwargs) + command_word = _pack_command_word(Command.LOAD_ADDRESS_COUNTER_HI) + + _execute_write_command(interface, command_word, bytes([address]), **kwargs) def load_address_counter_lo(interface, address, **kwargs): """Execute a LOAD_ADDRESS_COUNTER_LO command.""" - _execute_write_command(interface, Command.LOAD_ADDRESS_COUNTER_LO, bytes([address]), **kwargs) + command_word = _pack_command_word(Command.LOAD_ADDRESS_COUNTER_LO) + + _execute_write_command(interface, command_word, bytes([address]), **kwargs) def write_data(interface, data, **kwargs): """Execute a WRITE_DATA command.""" - _execute_write_command(interface, Command.WRITE_DATA, data, **kwargs) + command_word = _pack_command_word(Command.WRITE_DATA) + + _execute_write_command(interface, command_word, data, **kwargs) def clear(interface): """Execute a CLEAR command.""" @@ -223,38 +243,48 @@ def diagnostic_reset(interface): """Execute a DIAGNOSTIC_RESET command.""" raise NotImplementedError -def _execute_read_command(interface, command, response_length=1, +def _execute_read_command(interface, command_word, response_length=1, allow_trta_response=False, trta_value=None, unpack_data_words=True, **kwargs): """Execute a standard read command.""" - command_word = _pack_command_word(command) - response = interface.execute(command_word, response_length=response_length, **kwargs) if allow_trta_response and len(response) == 1 and response[0] == 0: return trta_value if len(response) != response_length: + (_, command) = _unpack_command_word(command_word) + raise ProtocolError(f'Expected {response_length} word {command.name} response') return _unpack_data_words(response) if unpack_data_words else response -def _execute_write_command(interface, command, data=None, **kwargs): +def _execute_write_command(interface, command_word, data=None, **kwargs): """Execute a standard write command.""" - command_word = _pack_command_word(command) - response = interface.execute(command_word, data, **kwargs) if len(response) != 1: + (_, command) = _unpack_command_word(command_word) + raise ProtocolError(f'Expected 1 word {command.name} response') if response[0] != 0: raise ProtocolError('Expected TR/TA response') def _pack_command_word(command, address=0): - """Pack a command and address into a 10-bit command word for the interface.""" + """Pack a command and address into a 10-bit command word.""" return (address << 7) | (command.value << 2) | 0x1 +def _unpack_command_word(word): + """Unpack a 10-bit command word.""" + if (word & 0x1) != 1: + raise ProtocolError('Word does not have command bit set') + + address = (word >> 7) & 0x7 + command = (word >> 2) & 0x1f + + return (address, Command(command)) + def _unpack_data_words(words, check_parity=False): """Unpack the data bytes from 10-bit data words.""" return bytes([_unpack_data_word(word, check_parity=check_parity) for word in words]) diff --git a/pycoax/tests/test_protocol.py b/pycoax/tests/test_protocol.py index fa51745..2b3a6ce 100644 --- a/pycoax/tests/test_protocol.py +++ b/pycoax/tests/test_protocol.py @@ -4,7 +4,7 @@ from unittest.mock import Mock import context from coax import PollResponse, KeystrokePollResponse, ProtocolError -from coax.protocol import Command, Status, TerminalId, _execute_read_command, _execute_write_command, _pack_command_word, _unpack_data_words, _unpack_data_word +from coax.protocol import Command, Status, TerminalId, _execute_read_command, _execute_write_command, _pack_command_word, _unpack_command_word, _unpack_data_words, _unpack_data_word class PollResponseTestCase(unittest.TestCase): def test_is_power_on_reset_complete(self): @@ -69,39 +69,49 @@ class ExecuteReadCommandTestCase(unittest.TestCase): def test(self): # Arrange + command_word = _pack_command_word(Command.READ_TERMINAL_ID) + self.interface.execute = Mock(return_value=[0b0000000010]) # Act and assert - self.assertEqual(_execute_read_command(self.interface, Command.READ_TERMINAL_ID), bytes.fromhex('00')) + self.assertEqual(_execute_read_command(self.interface, command_word), bytes.fromhex('00')) def test_allow_trta_response(self): # Arrange + command_word = _pack_command_word(Command.POLL) + self.interface.execute = Mock(return_value=[0b0000000000]) # Act and assert - self.assertEqual(_execute_read_command(self.interface, Command.POLL, allow_trta_response=True, trta_value='TRTA'), 'TRTA') + self.assertEqual(_execute_read_command(self.interface, command_word, allow_trta_response=True, trta_value='TRTA'), 'TRTA') def test_disable_unpack_data_words(self): # Arrange + command_word = _pack_command_word(Command.POLL) + self.interface.execute = Mock(return_value=[0b1111111110]) # Act and assert - self.assertEqual(_execute_read_command(self.interface, Command.POLL, unpack_data_words=False), [0b1111111110]) + self.assertEqual(_execute_read_command(self.interface, command_word, unpack_data_words=False), [0b1111111110]) def test_unexpected_response_length(self): # Arrange + command_word = _pack_command_word(Command.READ_TERMINAL_ID) + self.interface.execute = Mock(return_value=[]) # Act and assert with self.assertRaisesRegex(ProtocolError, 'Expected 1 word READ_TERMINAL_ID response'): - _execute_read_command(self.interface, Command.READ_TERMINAL_ID) + _execute_read_command(self.interface, command_word) def test_timeout_is_passed_to_interface(self): # Arrange + command_word = _pack_command_word(Command.READ_TERMINAL_ID) + self.interface.execute = Mock(return_value=[0b0000000010]) # Act - _execute_read_command(self.interface, Command.READ_TERMINAL_ID, timeout=10) + _execute_read_command(self.interface, command_word, timeout=10) # Assert self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10) @@ -112,43 +122,72 @@ class ExecuteWriteCommandTestCase(unittest.TestCase): def test(self): # Arrange + command_word = _pack_command_word(Command.WRITE_DATA) + self.interface.execute = Mock(return_value=[0b0000000000]) # Act and assert - _execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef')) + _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef')) def test_unexpected_response_length(self): # Arrange + command_word = _pack_command_word(Command.WRITE_DATA) + self.interface.execute = Mock(return_value=[]) # Act and assert with self.assertRaisesRegex(ProtocolError, 'Expected 1 word WRITE_DATA response'): - _execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef')) + _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef')) def test_not_trta_response(self): # Arrange + command_word = _pack_command_word(Command.WRITE_DATA) + self.interface.execute = Mock(return_value=[0b0000000010]) # Act and assert with self.assertRaisesRegex(ProtocolError, 'Expected TR/TA response'): - _execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef')) + _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef')) def test_timeout_is_passed_to_interface(self): # Arrange + command_word = _pack_command_word(Command.WRITE_DATA) + self.interface.execute = Mock(return_value=[0b0000000000]) # Assert - _execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'), timeout=10) + _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), timeout=10) # Assert self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10) class PackCommandWordTestCase(unittest.TestCase): def test_without_address(self): - self.assertEqual(_pack_command_word(Command.POLL_ACK), 0b001000101) + self.assertEqual(_pack_command_word(Command.POLL_ACK), 0b0001000101) def test_with_address(self): - self.assertEqual(_pack_command_word(Command.POLL_ACK, address=3), 0b111000101) + self.assertEqual(_pack_command_word(Command.POLL_ACK, address=7), 0b1111000101) + +class UnpackCommandWordTestCase(unittest.TestCase): + def test_without_address(self): + # Act + (address, command) = _unpack_command_word(0b0001000101) + + # Assert + self.assertEqual(address, 0) + self.assertEqual(command, Command.POLL_ACK) + + def test_with_address(self): + # Act + (address, command) = _unpack_command_word(0b1111000101) + + # Assert + self.assertEqual(address, 7) + self.assertEqual(command, Command.POLL_ACK) + + def test_command_bit_not_set_error(self): + with self.assertRaisesRegex(ProtocolError, 'Word does not have command bit set'): + _unpack_command_word(0b0001000100) class UnpackDataWordsTestCase(unittest.TestCase): def test(self):