Refactor execute to take command word

This commit is contained in:
Andrew Kay
2019-12-21 09:57:29 -06:00
parent 54efc3845b
commit 543af3cbef
2 changed files with 99 additions and 30 deletions

View File

@@ -118,7 +118,9 @@ class TerminalId:
def poll(interface, **kwargs):
"""Execute a POLL command."""
response = _execute_read_command(interface, Command.POLL, allow_trta_response=True,
command_word = _pack_command_word(Command.POLL)
response = _execute_read_command(interface, command_word, allow_trta_response=True,
unpack_data_words=False, **kwargs)
if response is None:
@@ -136,32 +138,44 @@ def poll(interface, **kwargs):
def poll_ack(interface, **kwargs):
"""Execute a POLL_ACK command."""
_execute_write_command(interface, Command.POLL_ACK, **kwargs)
command_word = _pack_command_word(Command.POLL_ACK)
_execute_write_command(interface, command_word, **kwargs)
def read_status(interface, **kwargs):
"""Execute a READ_STATUS command."""
response = _execute_read_command(interface, Command.READ_STATUS, **kwargs)
command_word = _pack_command_word(Command.READ_STATUS)
response = _execute_read_command(interface, command_word, **kwargs)
return Status(response[0])
def read_terminal_id(interface, **kwargs):
"""Execute a READ_TERMINAL_ID command."""
response = _execute_read_command(interface, Command.READ_TERMINAL_ID, **kwargs)
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
response = _execute_read_command(interface, command_word, **kwargs)
return TerminalId(response[0])
def read_extended_id(interface, **kwargs):
"""Execute a READ_EXTENDED_ID command."""
return _execute_read_command(interface, Command.READ_EXTENDED_ID, 4,
allow_trta_response=True, **kwargs)
command_word = _pack_command_word(Command.READ_EXTENDED_ID)
return _execute_read_command(interface, command_word, 4, allow_trta_response=True,
**kwargs)
def read_address_counter_hi(interface, **kwargs):
"""Execute a READ_ADDRESS_COUNTER_HI command."""
return _execute_read_command(interface, Command.READ_ADDRESS_COUNTER_HI, **kwargs)[0]
command_word = _pack_command_word(Command.READ_ADDRESS_COUNTER_HI)
return _execute_read_command(interface, command_word, **kwargs)[0]
def read_address_counter_lo(interface, **kwargs):
"""Execute a READ_ADDRESS_COUTER_LO command."""
return _execute_read_command(interface, Command.READ_ADDRESS_COUNTER_LO, **kwargs)[0]
command_word = _pack_command_word(Command.READ_ADDRESS_COUNTER_LO)
return _execute_read_command(interface, command_word, **kwargs)[0]
def read_data(interface):
"""Execute a READ_DATA command."""
@@ -189,15 +203,21 @@ def load_mask(interface):
def load_address_counter_hi(interface, address, **kwargs):
"""Execute a LOAD_ADDRESS_COUNTER_HI command."""
_execute_write_command(interface, Command.LOAD_ADDRESS_COUNTER_HI, bytes([address]), **kwargs)
command_word = _pack_command_word(Command.LOAD_ADDRESS_COUNTER_HI)
_execute_write_command(interface, command_word, bytes([address]), **kwargs)
def load_address_counter_lo(interface, address, **kwargs):
"""Execute a LOAD_ADDRESS_COUNTER_LO command."""
_execute_write_command(interface, Command.LOAD_ADDRESS_COUNTER_LO, bytes([address]), **kwargs)
command_word = _pack_command_word(Command.LOAD_ADDRESS_COUNTER_LO)
_execute_write_command(interface, command_word, bytes([address]), **kwargs)
def write_data(interface, data, **kwargs):
"""Execute a WRITE_DATA command."""
_execute_write_command(interface, Command.WRITE_DATA, data, **kwargs)
command_word = _pack_command_word(Command.WRITE_DATA)
_execute_write_command(interface, command_word, data, **kwargs)
def clear(interface):
"""Execute a CLEAR command."""
@@ -223,38 +243,48 @@ def diagnostic_reset(interface):
"""Execute a DIAGNOSTIC_RESET command."""
raise NotImplementedError
def _execute_read_command(interface, command, response_length=1,
def _execute_read_command(interface, command_word, response_length=1,
allow_trta_response=False, trta_value=None,
unpack_data_words=True, **kwargs):
"""Execute a standard read command."""
command_word = _pack_command_word(command)
response = interface.execute(command_word, response_length=response_length, **kwargs)
if allow_trta_response and len(response) == 1 and response[0] == 0:
return trta_value
if len(response) != response_length:
(_, command) = _unpack_command_word(command_word)
raise ProtocolError(f'Expected {response_length} word {command.name} response')
return _unpack_data_words(response) if unpack_data_words else response
def _execute_write_command(interface, command, data=None, **kwargs):
def _execute_write_command(interface, command_word, data=None, **kwargs):
"""Execute a standard write command."""
command_word = _pack_command_word(command)
response = interface.execute(command_word, data, **kwargs)
if len(response) != 1:
(_, command) = _unpack_command_word(command_word)
raise ProtocolError(f'Expected 1 word {command.name} response')
if response[0] != 0:
raise ProtocolError('Expected TR/TA response')
def _pack_command_word(command, address=0):
"""Pack a command and address into a 10-bit command word for the interface."""
"""Pack a command and address into a 10-bit command word."""
return (address << 7) | (command.value << 2) | 0x1
def _unpack_command_word(word):
"""Unpack a 10-bit command word."""
if (word & 0x1) != 1:
raise ProtocolError('Word does not have command bit set')
address = (word >> 7) & 0x7
command = (word >> 2) & 0x1f
return (address, Command(command))
def _unpack_data_words(words, check_parity=False):
"""Unpack the data bytes from 10-bit data words."""
return bytes([_unpack_data_word(word, check_parity=check_parity) for word in words])

View File

@@ -4,7 +4,7 @@ from unittest.mock import Mock
import context
from coax import PollResponse, KeystrokePollResponse, ProtocolError
from coax.protocol import Command, Status, TerminalId, _execute_read_command, _execute_write_command, _pack_command_word, _unpack_data_words, _unpack_data_word
from coax.protocol import Command, Status, TerminalId, _execute_read_command, _execute_write_command, _pack_command_word, _unpack_command_word, _unpack_data_words, _unpack_data_word
class PollResponseTestCase(unittest.TestCase):
def test_is_power_on_reset_complete(self):
@@ -69,39 +69,49 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
def test(self):
# Arrange
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
self.interface.execute = Mock(return_value=[0b0000000010])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, Command.READ_TERMINAL_ID), bytes.fromhex('00'))
self.assertEqual(_execute_read_command(self.interface, command_word), bytes.fromhex('00'))
def test_allow_trta_response(self):
# Arrange
command_word = _pack_command_word(Command.POLL)
self.interface.execute = Mock(return_value=[0b0000000000])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, Command.POLL, allow_trta_response=True, trta_value='TRTA'), 'TRTA')
self.assertEqual(_execute_read_command(self.interface, command_word, allow_trta_response=True, trta_value='TRTA'), 'TRTA')
def test_disable_unpack_data_words(self):
# Arrange
command_word = _pack_command_word(Command.POLL)
self.interface.execute = Mock(return_value=[0b1111111110])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, Command.POLL, unpack_data_words=False), [0b1111111110])
self.assertEqual(_execute_read_command(self.interface, command_word, unpack_data_words=False), [0b1111111110])
def test_unexpected_response_length(self):
# Arrange
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
self.interface.execute = Mock(return_value=[])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word READ_TERMINAL_ID response'):
_execute_read_command(self.interface, Command.READ_TERMINAL_ID)
_execute_read_command(self.interface, command_word)
def test_timeout_is_passed_to_interface(self):
# Arrange
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
self.interface.execute = Mock(return_value=[0b0000000010])
# Act
_execute_read_command(self.interface, Command.READ_TERMINAL_ID, timeout=10)
_execute_read_command(self.interface, command_word, timeout=10)
# Assert
self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10)
@@ -112,43 +122,72 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
def test(self):
# Arrange
command_word = _pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[0b0000000000])
# Act and assert
_execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'))
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'))
def test_unexpected_response_length(self):
# Arrange
command_word = _pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word WRITE_DATA response'):
_execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'))
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'))
def test_not_trta_response(self):
# Arrange
command_word = _pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[0b0000000010])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected TR/TA response'):
_execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'))
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'))
def test_timeout_is_passed_to_interface(self):
# Arrange
command_word = _pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[0b0000000000])
# Assert
_execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'), timeout=10)
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), timeout=10)
# Assert
self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10)
class PackCommandWordTestCase(unittest.TestCase):
def test_without_address(self):
self.assertEqual(_pack_command_word(Command.POLL_ACK), 0b001000101)
self.assertEqual(_pack_command_word(Command.POLL_ACK), 0b0001000101)
def test_with_address(self):
self.assertEqual(_pack_command_word(Command.POLL_ACK, address=3), 0b111000101)
self.assertEqual(_pack_command_word(Command.POLL_ACK, address=7), 0b1111000101)
class UnpackCommandWordTestCase(unittest.TestCase):
def test_without_address(self):
# Act
(address, command) = _unpack_command_word(0b0001000101)
# Assert
self.assertEqual(address, 0)
self.assertEqual(command, Command.POLL_ACK)
def test_with_address(self):
# Act
(address, command) = _unpack_command_word(0b1111000101)
# Assert
self.assertEqual(address, 7)
self.assertEqual(command, Command.POLL_ACK)
def test_command_bit_not_set_error(self):
with self.assertRaisesRegex(ProtocolError, 'Word does not have command bit set'):
_unpack_command_word(0b0001000100)
class UnpackDataWordsTestCase(unittest.TestCase):
def test(self):