mirror of
https://github.com/lowobservable/coax.git
synced 2026-02-28 09:37:40 +00:00
Merge branch 'master' into i2
This commit is contained in:
@@ -167,10 +167,10 @@ class SecondaryControl:
|
||||
|
||||
def poll(interface, action=PollAction.NONE, **kwargs):
|
||||
"""Execute a POLL command."""
|
||||
command_word = (action.value << 8) | _pack_command_word(Command.POLL)
|
||||
command_word = (action.value << 8) | pack_command_word(Command.POLL)
|
||||
|
||||
response = _execute_read_command(interface, command_word, allow_trta_response=True,
|
||||
unpack_data_words=False, **kwargs)
|
||||
unpack=False, **kwargs)
|
||||
|
||||
if response is None:
|
||||
return None
|
||||
@@ -187,13 +187,13 @@ def poll(interface, action=PollAction.NONE, **kwargs):
|
||||
|
||||
def poll_ack(interface, **kwargs):
|
||||
"""Execute a POLL_ACK command."""
|
||||
command_word = _pack_command_word(Command.POLL_ACK)
|
||||
command_word = pack_command_word(Command.POLL_ACK)
|
||||
|
||||
_execute_write_command(interface, command_word, **kwargs)
|
||||
|
||||
def read_status(interface, **kwargs):
|
||||
"""Execute a READ_STATUS command."""
|
||||
command_word = _pack_command_word(Command.READ_STATUS)
|
||||
command_word = pack_command_word(Command.READ_STATUS)
|
||||
|
||||
response = _execute_read_command(interface, command_word, **kwargs)
|
||||
|
||||
@@ -201,7 +201,7 @@ def read_status(interface, **kwargs):
|
||||
|
||||
def read_terminal_id(interface, **kwargs):
|
||||
"""Execute a READ_TERMINAL_ID command."""
|
||||
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
|
||||
command_word = pack_command_word(Command.READ_TERMINAL_ID)
|
||||
|
||||
response = _execute_read_command(interface, command_word, **kwargs)
|
||||
|
||||
@@ -209,99 +209,99 @@ def read_terminal_id(interface, **kwargs):
|
||||
|
||||
def read_extended_id(interface, **kwargs):
|
||||
"""Execute a READ_EXTENDED_ID command."""
|
||||
command_word = _pack_command_word(Command.READ_EXTENDED_ID)
|
||||
command_word = pack_command_word(Command.READ_EXTENDED_ID)
|
||||
|
||||
return _execute_read_command(interface, command_word, 4, allow_trta_response=True,
|
||||
**kwargs)
|
||||
|
||||
def read_address_counter_hi(interface, **kwargs):
|
||||
"""Execute a READ_ADDRESS_COUNTER_HI command."""
|
||||
command_word = _pack_command_word(Command.READ_ADDRESS_COUNTER_HI)
|
||||
command_word = pack_command_word(Command.READ_ADDRESS_COUNTER_HI)
|
||||
|
||||
return _execute_read_command(interface, command_word, **kwargs)[0]
|
||||
|
||||
def read_address_counter_lo(interface, **kwargs):
|
||||
"""Execute a READ_ADDRESS_COUTER_LO command."""
|
||||
command_word = _pack_command_word(Command.READ_ADDRESS_COUNTER_LO)
|
||||
command_word = pack_command_word(Command.READ_ADDRESS_COUNTER_LO)
|
||||
|
||||
return _execute_read_command(interface, command_word, **kwargs)[0]
|
||||
|
||||
def read_data(interface, **kwargs):
|
||||
"""Execute a READ_DATA command."""
|
||||
command_word = _pack_command_word(Command.READ_DATA)
|
||||
command_word = pack_command_word(Command.READ_DATA)
|
||||
|
||||
return _execute_read_command(interface, command_word, **kwargs)
|
||||
|
||||
def read_multiple(interface, **kwargs):
|
||||
"""Execute a READ_MULTIPLE command."""
|
||||
command_word = _pack_command_word(Command.READ_MULTIPLE)
|
||||
command_word = pack_command_word(Command.READ_MULTIPLE)
|
||||
|
||||
return _execute_read_command(interface, command_word, 32,
|
||||
validate_response_length=False, **kwargs)
|
||||
|
||||
def reset(interface, **kwargs):
|
||||
"""Execute a RESET command."""
|
||||
command_word = _pack_command_word(Command.RESET)
|
||||
command_word = pack_command_word(Command.RESET)
|
||||
|
||||
_execute_write_command(interface, command_word, **kwargs)
|
||||
|
||||
def load_control_register(interface, control, **kwargs):
|
||||
"""Execute a LOAD_CONTROL_REGISTER command."""
|
||||
command_word = _pack_command_word(Command.LOAD_CONTROL_REGISTER)
|
||||
command_word = pack_command_word(Command.LOAD_CONTROL_REGISTER)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([control.value]), **kwargs)
|
||||
|
||||
def load_secondary_control(interface, control, **kwargs):
|
||||
"""Execute a LOAD_SECONDARY_CONTROL command."""
|
||||
command_word = _pack_command_word(Command.LOAD_SECONDARY_CONTROL)
|
||||
command_word = pack_command_word(Command.LOAD_SECONDARY_CONTROL)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([control.value]), **kwargs)
|
||||
|
||||
def load_mask(interface, mask, **kwargs):
|
||||
"""Execute a LOAD_MASK command."""
|
||||
command_word = _pack_command_word(Command.LOAD_MASK)
|
||||
command_word = pack_command_word(Command.LOAD_MASK)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([mask]), **kwargs)
|
||||
|
||||
def load_address_counter_hi(interface, address, **kwargs):
|
||||
"""Execute a LOAD_ADDRESS_COUNTER_HI command."""
|
||||
command_word = _pack_command_word(Command.LOAD_ADDRESS_COUNTER_HI)
|
||||
command_word = pack_command_word(Command.LOAD_ADDRESS_COUNTER_HI)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([address]), **kwargs)
|
||||
|
||||
def load_address_counter_lo(interface, address, **kwargs):
|
||||
"""Execute a LOAD_ADDRESS_COUNTER_LO command."""
|
||||
command_word = _pack_command_word(Command.LOAD_ADDRESS_COUNTER_LO)
|
||||
command_word = pack_command_word(Command.LOAD_ADDRESS_COUNTER_LO)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([address]), **kwargs)
|
||||
|
||||
def write_data(interface, data, **kwargs):
|
||||
"""Execute a WRITE_DATA command."""
|
||||
command_word = _pack_command_word(Command.WRITE_DATA)
|
||||
command_word = pack_command_word(Command.WRITE_DATA)
|
||||
|
||||
_execute_write_command(interface, command_word, data, **kwargs)
|
||||
|
||||
def clear(interface, pattern, **kwargs):
|
||||
"""Execute a CLEAR command."""
|
||||
command_word = _pack_command_word(Command.CLEAR)
|
||||
command_word = pack_command_word(Command.CLEAR)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([pattern]), **kwargs)
|
||||
|
||||
def search_forward(interface, pattern, **kwargs):
|
||||
"""Execute a SEARCH_FORWARD command."""
|
||||
command_word = _pack_command_word(Command.SEARCH_FORWARD)
|
||||
command_word = pack_command_word(Command.SEARCH_FORWARD)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([pattern]), **kwargs)
|
||||
|
||||
def search_backward(interface, pattern, **kwargs):
|
||||
"""Execute a SEARCH_BACKWARD command."""
|
||||
command_word = _pack_command_word(Command.SEARCH_BACKWARD)
|
||||
command_word = pack_command_word(Command.SEARCH_BACKWARD)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([pattern]), **kwargs)
|
||||
|
||||
def insert_byte(interface, byte, **kwargs):
|
||||
"""Execute a INSERT_BYTE command."""
|
||||
command_word = _pack_command_word(Command.INSERT_BYTE)
|
||||
command_word = pack_command_word(Command.INSERT_BYTE)
|
||||
|
||||
_execute_write_command(interface, command_word, bytes([byte]), **kwargs)
|
||||
|
||||
@@ -313,55 +313,36 @@ def diagnostic_reset(interface):
|
||||
"""Execute a DIAGNOSTIC_RESET command."""
|
||||
raise NotImplementedError
|
||||
|
||||
def _execute_read_command(interface, command_word, response_length=1,
|
||||
validate_response_length=True, allow_trta_response=False,
|
||||
trta_value=None, unpack_data_words=True, **kwargs):
|
||||
"""Execute a standard read command."""
|
||||
response = interface.execute(command_word, response_length=response_length, **kwargs)
|
||||
def pack_command_word(command):
|
||||
"""Pack a command into a 10-bit command word."""
|
||||
return (command.value << 2) | 0x1
|
||||
|
||||
if allow_trta_response and len(response) == 1 and response[0] == 0:
|
||||
return trta_value
|
||||
def is_command_word(word):
|
||||
"""Is command word bit set?"""
|
||||
return (word & 0x1) == 1
|
||||
|
||||
if validate_response_length and len(response) != response_length:
|
||||
(_, command) = _unpack_command_word(command_word)
|
||||
|
||||
raise ProtocolError(f'Expected {response_length} word {command.name} response')
|
||||
|
||||
return _unpack_data_words(response) if unpack_data_words else response
|
||||
|
||||
def _execute_write_command(interface, command_word, data=None, **kwargs):
|
||||
"""Execute a standard write command."""
|
||||
response = interface.execute(command_word, data, **kwargs)
|
||||
|
||||
if len(response) != 1:
|
||||
(_, command) = _unpack_command_word(command_word)
|
||||
|
||||
raise ProtocolError(f'Expected 1 word {command.name} response')
|
||||
|
||||
if response[0] != 0:
|
||||
raise ProtocolError('Expected TR/TA response')
|
||||
|
||||
def _pack_command_word(command, address=0):
|
||||
"""Pack a command and address into a 10-bit command word."""
|
||||
return (address << 7) | (command.value << 2) | 0x1
|
||||
|
||||
def _unpack_command_word(word):
|
||||
def unpack_command_word(word):
|
||||
"""Unpack a 10-bit command word."""
|
||||
if (word & 0x1) != 1:
|
||||
if not is_command_word(word):
|
||||
raise ProtocolError('Word does not have command bit set')
|
||||
|
||||
address = (word >> 7) & 0x7
|
||||
command = (word >> 2) & 0x1f
|
||||
|
||||
return (address, Command(command))
|
||||
return Command(command)
|
||||
|
||||
def _unpack_data_words(words, check_parity=False):
|
||||
"""Unpack the data bytes from 10-bit data words."""
|
||||
return bytes([_unpack_data_word(word, check_parity=check_parity) for word in words])
|
||||
def pack_data_word(byte, set_parity=True):
|
||||
"""Pack a data byte into a 10-bit data word."""
|
||||
parity = odd_parity(byte) if set_parity else 0
|
||||
|
||||
def _unpack_data_word(word, check_parity=False):
|
||||
return (byte << 2) | (parity << 1)
|
||||
|
||||
def is_data_word(word):
|
||||
"""Is data word bit set?"""
|
||||
return (word & 0x1) == 0
|
||||
|
||||
def unpack_data_word(word, check_parity=False):
|
||||
"""Unpack the data byte from a 10-bit data word."""
|
||||
if (word & 0x1) != 0:
|
||||
if not is_data_word(word):
|
||||
raise ProtocolError('Word does not have data bit set')
|
||||
|
||||
byte = (word >> 2) & 0xff
|
||||
@@ -371,3 +352,39 @@ def _unpack_data_word(word, check_parity=False):
|
||||
raise ProtocolError('Parity error')
|
||||
|
||||
return byte
|
||||
|
||||
def pack_data_words(bytes_, set_parity=True):
|
||||
"""Pack data bytes into 10-bit data words."""
|
||||
return [pack_data_word(byte, set_parity=set_parity) for byte in bytes_]
|
||||
|
||||
def unpack_data_words(words, check_parity=False):
|
||||
"""Unpack the data bytes from 10-bit data words."""
|
||||
return bytes([unpack_data_word(word, check_parity=check_parity) for word in words])
|
||||
|
||||
def _execute_read_command(interface, command_word, response_length=1,
|
||||
validate_response_length=True, allow_trta_response=False,
|
||||
trta_value=None, unpack=True, **kwargs):
|
||||
"""Execute a standard read command."""
|
||||
response = interface.execute(command_word, response_length=response_length, **kwargs)
|
||||
|
||||
if allow_trta_response and len(response) == 1 and response[0] == 0:
|
||||
return trta_value
|
||||
|
||||
if validate_response_length and len(response) != response_length:
|
||||
command = unpack_command_word(command_word)
|
||||
|
||||
raise ProtocolError(f'Expected {response_length} word {command.name} response')
|
||||
|
||||
return unpack_data_words(response) if unpack else response
|
||||
|
||||
def _execute_write_command(interface, command_word, data=None, **kwargs):
|
||||
"""Execute a standard write command."""
|
||||
response = interface.execute(command_word, data, **kwargs)
|
||||
|
||||
if len(response) != 1:
|
||||
command = unpack_command_word(command_word)
|
||||
|
||||
raise ProtocolError(f'Expected 1 word {command.name} response')
|
||||
|
||||
if response[0] != 0:
|
||||
raise ProtocolError('Expected TR/TA response')
|
||||
|
||||
@@ -4,7 +4,7 @@ from unittest.mock import Mock
|
||||
import context
|
||||
|
||||
from coax import PollResponse, KeystrokePollResponse, ProtocolError
|
||||
from coax.protocol import Command, Status, TerminalId, Control, SecondaryControl, _execute_read_command, _execute_write_command, _pack_command_word, _unpack_command_word, _unpack_data_words, _unpack_data_word
|
||||
from coax.protocol import Command, Status, TerminalId, Control, SecondaryControl, pack_command_word, unpack_command_word, pack_data_word, unpack_data_word, pack_data_words, unpack_data_words, _execute_read_command, _execute_write_command
|
||||
|
||||
class PollResponseTestCase(unittest.TestCase):
|
||||
def test_is_power_on_reset_complete(self):
|
||||
@@ -95,13 +95,61 @@ class SecondaryControlTestCase(unittest.TestCase):
|
||||
|
||||
self.assertEqual(control.value, 0b00000001)
|
||||
|
||||
class PackCommandWordTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
self.assertEqual(pack_command_word(Command.POLL_ACK), 0b0001000101)
|
||||
|
||||
class UnpackCommandWordTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
# Act
|
||||
command = unpack_command_word(0b0001000101)
|
||||
|
||||
# Assert
|
||||
self.assertEqual(command, Command.POLL_ACK)
|
||||
|
||||
def test_command_bit_not_set_error(self):
|
||||
with self.assertRaisesRegex(ProtocolError, 'Word does not have command bit set'):
|
||||
unpack_command_word(0b0001000100)
|
||||
|
||||
class PackDataWordTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
self.assertEqual(pack_data_word(0x00), 0b0000000010)
|
||||
self.assertEqual(pack_data_word(0x01), 0b0000000100)
|
||||
self.assertEqual(pack_data_word(0xff), 0b1111111110)
|
||||
|
||||
def test_disable_set_parity(self):
|
||||
self.assertEqual(pack_data_word(0x00, set_parity=False), 0b0000000000)
|
||||
self.assertEqual(pack_data_word(0x01, set_parity=False), 0b0000000100)
|
||||
self.assertEqual(pack_data_word(0xff, set_parity=False), 0b1111111100)
|
||||
|
||||
class UnpackDataWordTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
self.assertEqual(unpack_data_word(0b0000000010), 0x00)
|
||||
self.assertEqual(unpack_data_word(0b1111111110), 0xff)
|
||||
|
||||
def test_data_bit_not_set_error(self):
|
||||
with self.assertRaisesRegex(ProtocolError, 'Word does not have data bit set'):
|
||||
unpack_data_word(0b0000000011)
|
||||
|
||||
def test_parity_error(self):
|
||||
with self.assertRaisesRegex(ProtocolError, 'Parity error'):
|
||||
unpack_data_word(0b0000000000, check_parity=True)
|
||||
|
||||
class PackDataWordsTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
self.assertEqual(pack_data_words(bytes.fromhex('00 ff')), [0b0000000010, 0b1111111110])
|
||||
|
||||
class UnpackDataWordsTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
self.assertEqual(unpack_data_words([0b0000000010, 0b1111111110]), bytes.fromhex('00 ff'))
|
||||
|
||||
class ExecuteReadCommandTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
|
||||
def test(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
|
||||
command_word = pack_command_word(Command.READ_TERMINAL_ID)
|
||||
|
||||
self.interface.execute = Mock(return_value=[0b0000000010])
|
||||
|
||||
@@ -110,25 +158,25 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
|
||||
|
||||
def test_allow_trta_response(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.POLL)
|
||||
command_word = pack_command_word(Command.POLL)
|
||||
|
||||
self.interface.execute = Mock(return_value=[0b0000000000])
|
||||
|
||||
# Act and assert
|
||||
self.assertEqual(_execute_read_command(self.interface, command_word, allow_trta_response=True, trta_value='TRTA'), 'TRTA')
|
||||
|
||||
def test_disable_unpack_data_words(self):
|
||||
def test_disable_unpack(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.POLL)
|
||||
command_word = pack_command_word(Command.POLL)
|
||||
|
||||
self.interface.execute = Mock(return_value=[0b1111111110])
|
||||
|
||||
# Act and assert
|
||||
self.assertEqual(_execute_read_command(self.interface, command_word, unpack_data_words=False), [0b1111111110])
|
||||
self.assertEqual(_execute_read_command(self.interface, command_word, unpack=False), [0b1111111110])
|
||||
|
||||
def test_unexpected_response_length(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
|
||||
command_word = pack_command_word(Command.READ_TERMINAL_ID)
|
||||
|
||||
self.interface.execute = Mock(return_value=[])
|
||||
|
||||
@@ -138,7 +186,7 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
|
||||
|
||||
def test_timeout_is_passed_to_interface(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.READ_TERMINAL_ID)
|
||||
command_word = pack_command_word(Command.READ_TERMINAL_ID)
|
||||
|
||||
self.interface.execute = Mock(return_value=[0b0000000010])
|
||||
|
||||
@@ -154,7 +202,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
|
||||
|
||||
def test(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.WRITE_DATA)
|
||||
command_word = pack_command_word(Command.WRITE_DATA)
|
||||
|
||||
self.interface.execute = Mock(return_value=[0b0000000000])
|
||||
|
||||
@@ -163,7 +211,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
|
||||
|
||||
def test_unexpected_response_length(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.WRITE_DATA)
|
||||
command_word = pack_command_word(Command.WRITE_DATA)
|
||||
|
||||
self.interface.execute = Mock(return_value=[])
|
||||
|
||||
@@ -173,7 +221,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
|
||||
|
||||
def test_not_trta_response(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.WRITE_DATA)
|
||||
command_word = pack_command_word(Command.WRITE_DATA)
|
||||
|
||||
self.interface.execute = Mock(return_value=[0b0000000010])
|
||||
|
||||
@@ -183,7 +231,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
|
||||
|
||||
def test_timeout_is_passed_to_interface(self):
|
||||
# Arrange
|
||||
command_word = _pack_command_word(Command.WRITE_DATA)
|
||||
command_word = pack_command_word(Command.WRITE_DATA)
|
||||
|
||||
self.interface.execute = Mock(return_value=[0b0000000000])
|
||||
|
||||
@@ -193,50 +241,5 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10)
|
||||
|
||||
class PackCommandWordTestCase(unittest.TestCase):
|
||||
def test_without_address(self):
|
||||
self.assertEqual(_pack_command_word(Command.POLL_ACK), 0b0001000101)
|
||||
|
||||
def test_with_address(self):
|
||||
self.assertEqual(_pack_command_word(Command.POLL_ACK, address=7), 0b1111000101)
|
||||
|
||||
class UnpackCommandWordTestCase(unittest.TestCase):
|
||||
def test_without_address(self):
|
||||
# Act
|
||||
(address, command) = _unpack_command_word(0b0001000101)
|
||||
|
||||
# Assert
|
||||
self.assertEqual(address, 0)
|
||||
self.assertEqual(command, Command.POLL_ACK)
|
||||
|
||||
def test_with_address(self):
|
||||
# Act
|
||||
(address, command) = _unpack_command_word(0b1111000101)
|
||||
|
||||
# Assert
|
||||
self.assertEqual(address, 7)
|
||||
self.assertEqual(command, Command.POLL_ACK)
|
||||
|
||||
def test_command_bit_not_set_error(self):
|
||||
with self.assertRaisesRegex(ProtocolError, 'Word does not have command bit set'):
|
||||
_unpack_command_word(0b0001000100)
|
||||
|
||||
class UnpackDataWordsTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
self.assertEqual(_unpack_data_words([0b0000000010, 0b1111111110]), bytes.fromhex('00 ff'))
|
||||
|
||||
class UnpackDataWordTestCase(unittest.TestCase):
|
||||
def test(self):
|
||||
self.assertEqual(_unpack_data_word(0b0000000010), 0x00)
|
||||
self.assertEqual(_unpack_data_word(0b1111111110), 0xff)
|
||||
|
||||
def test_data_bit_not_set_error(self):
|
||||
with self.assertRaisesRegex(ProtocolError, 'Word does not have data bit set'):
|
||||
_unpack_data_word(0b0000000011)
|
||||
|
||||
def test_parity_error(self):
|
||||
with self.assertRaisesRegex(ProtocolError, 'Parity error'):
|
||||
_unpack_data_word(0b0000000000, check_parity=True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user