mirror of
https://github.com/lowobservable/coax.git
synced 2026-04-24 19:40:33 +00:00
Fix issue where keyword arguments were not passed to interface
This commit is contained in:
@@ -196,7 +196,7 @@ def _execute_read_command(interface, command, response_length=1,
|
|||||||
"""Execute a standard read command."""
|
"""Execute a standard read command."""
|
||||||
command_word = _pack_command_word(command)
|
command_word = _pack_command_word(command)
|
||||||
|
|
||||||
response = interface.execute(command_word, response_length=response_length)
|
response = interface.execute(command_word, response_length=response_length, **kwargs)
|
||||||
|
|
||||||
if allow_trta_response and len(response) == 1 and response[0] == 0:
|
if allow_trta_response and len(response) == 1 and response[0] == 0:
|
||||||
return trta_value
|
return trta_value
|
||||||
@@ -206,11 +206,11 @@ def _execute_read_command(interface, command, response_length=1,
|
|||||||
|
|
||||||
return _unpack_data_words(response) if unpack_data_words else response
|
return _unpack_data_words(response) if unpack_data_words else response
|
||||||
|
|
||||||
def _execute_write_command(interface, command, data=None):
|
def _execute_write_command(interface, command, data=None, **kwargs):
|
||||||
"""Execute a standard write command."""
|
"""Execute a standard write command."""
|
||||||
command_word = _pack_command_word(command)
|
command_word = _pack_command_word(command)
|
||||||
|
|
||||||
response = interface.execute(command_word, data)
|
response = interface.execute(command_word, data, **kwargs)
|
||||||
|
|
||||||
if len(response) != 1:
|
if len(response) != 1:
|
||||||
raise ProtocolError(f'Expected 1 word {command.name} response')
|
raise ProtocolError(f'Expected 1 word {command.name} response')
|
||||||
|
|||||||
@@ -58,6 +58,16 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
|
|||||||
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word READ_TERMINAL_ID response'):
|
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word READ_TERMINAL_ID response'):
|
||||||
_execute_read_command(self.interface, Command.READ_TERMINAL_ID)
|
_execute_read_command(self.interface, Command.READ_TERMINAL_ID)
|
||||||
|
|
||||||
|
def test_timeout_is_passed_to_interface(self):
|
||||||
|
# Arrange
|
||||||
|
self.interface.execute = Mock(return_value=[0b0000000010])
|
||||||
|
|
||||||
|
# Act
|
||||||
|
_execute_read_command(self.interface, Command.READ_TERMINAL_ID, timeout=10)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10)
|
||||||
|
|
||||||
class ExecuteWriteCommandTestCase(unittest.TestCase):
|
class ExecuteWriteCommandTestCase(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.interface = Mock()
|
self.interface = Mock()
|
||||||
@@ -85,6 +95,16 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
|
|||||||
with self.assertRaisesRegex(ProtocolError, 'Expected TR/TA response'):
|
with self.assertRaisesRegex(ProtocolError, 'Expected TR/TA response'):
|
||||||
_execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'))
|
_execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'))
|
||||||
|
|
||||||
|
def test_timeout_is_passed_to_interface(self):
|
||||||
|
# Arrange
|
||||||
|
self.interface.execute = Mock(return_value=[0b0000000000])
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
_execute_write_command(self.interface, Command.WRITE_DATA, bytes.fromhex('de ad be ef'), timeout=10)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10)
|
||||||
|
|
||||||
class PackCommandWordTestCase(unittest.TestCase):
|
class PackCommandWordTestCase(unittest.TestCase):
|
||||||
def test_without_address(self):
|
def test_without_address(self):
|
||||||
self.assertEqual(_pack_command_word(Command.POLL_ACK), 0b001000101)
|
self.assertEqual(_pack_command_word(Command.POLL_ACK), 0b001000101)
|
||||||
|
|||||||
Reference in New Issue
Block a user