mirror of
https://github.com/lowobservable/coax.git
synced 2026-02-27 17:32:39 +00:00
303 lines
10 KiB
Python
303 lines
10 KiB
Python
import unittest
|
|
from unittest.mock import Mock
|
|
|
|
import context
|
|
|
|
from coax.interface import Interface, normalize_frame
|
|
from coax.protocol import FrameFormat, ReadAddressCounterHi, ReadAddressCounterLo
|
|
from coax.exceptions import InterfaceError, ReceiveTimeout, ProtocolError
|
|
|
|
class InterfaceExecuteTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.interface = Interface()
|
|
|
|
self.interface._transmit_receive = Mock()
|
|
|
|
def test_single_command(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_00]]
|
|
|
|
# Act
|
|
response = self.interface.execute(ReadAddressCounterHi())
|
|
|
|
# Assert
|
|
self.assertEqual(response, 0x02)
|
|
|
|
self.interface._transmit_receive.assert_called_once()
|
|
|
|
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
|
|
|
|
self.assertEqual(outbound_frames, [(None, (FrameFormat.WORD_DATA, 0b000_00101_01))])
|
|
self.assertEqual(response_lengths, [1])
|
|
|
|
def test_single_addressed_command(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_00]]
|
|
|
|
# Act
|
|
response = self.interface.execute((0b111000, ReadAddressCounterHi()))
|
|
|
|
# Assert
|
|
self.assertEqual(response, 0x02)
|
|
|
|
self.interface._transmit_receive.assert_called_once()
|
|
|
|
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
|
|
|
|
self.assertEqual(outbound_frames, [(0b111000, (FrameFormat.WORD_DATA, 0b000_00101_01))])
|
|
self.assertEqual(response_lengths, [1])
|
|
|
|
def test_multiple_commands(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b11111111_00]]
|
|
|
|
# Act
|
|
response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
|
|
|
|
# Assert
|
|
self.assertEqual(response, [0x02, 0xff])
|
|
|
|
self.interface._transmit_receive.assert_called_once()
|
|
|
|
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
|
|
|
|
self.assertEqual(outbound_frames, [(None, (FrameFormat.WORD_DATA, 0b000_00101_01)), (None, (FrameFormat.WORD_DATA, 0b000_10101_01))])
|
|
self.assertEqual(response_lengths, [1, 1])
|
|
|
|
def test_multiple_addressed_commands(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b11111111_00]]
|
|
|
|
# Act
|
|
response = self.interface.execute([(0b111000, ReadAddressCounterHi()), (0b111000, ReadAddressCounterLo())])
|
|
|
|
# Assert
|
|
self.assertEqual(response, [0x02, 0xff])
|
|
|
|
self.interface._transmit_receive.assert_called_once()
|
|
|
|
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
|
|
|
|
self.assertEqual(outbound_frames, [(0b111000, (FrameFormat.WORD_DATA, 0b000_00101_01)), (0b111000, (FrameFormat.WORD_DATA, 0b000_10101_01))])
|
|
self.assertEqual(response_lengths, [1, 1])
|
|
|
|
def test_timeout(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_00]]
|
|
|
|
# Act
|
|
response = self.interface.execute(ReadAddressCounterHi(), timeout=0.1)
|
|
|
|
# Assert
|
|
self.interface._transmit_receive.assert_called_once()
|
|
|
|
(_, _, timeout) = self.interface._transmit_receive.call_args[0]
|
|
|
|
self.assertEqual(timeout, 0.1)
|
|
|
|
def test_single_command_interface_error(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.side_effect=InterfaceError()
|
|
|
|
# Act and assert
|
|
with self.assertRaises(InterfaceError):
|
|
self.interface.execute(ReadAddressCounterHi())
|
|
|
|
def test_multiple_command_interface_error(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.side_effect=InterfaceError()
|
|
|
|
# Act and assert
|
|
with self.assertRaises(InterfaceError):
|
|
self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
|
|
|
|
def test_single_command_receive_timeout(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[ReceiveTimeout()]
|
|
|
|
# Act and assert
|
|
with self.assertRaises(ReceiveTimeout):
|
|
self.interface.execute(ReadAddressCounterHi())
|
|
|
|
def test_multiple_command_receive_timeout(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_00], ReceiveTimeout()]
|
|
|
|
# Act
|
|
response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
|
|
|
|
# Assert
|
|
self.assertEqual(len(response), 2)
|
|
|
|
self.assertEqual(response[0], 0x02)
|
|
self.assertIsInstance(response[1], ReceiveTimeout)
|
|
|
|
def test_single_command_protocol_error(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_01]]
|
|
|
|
# Act and assert
|
|
with self.assertRaises(ProtocolError):
|
|
self.interface.execute(ReadAddressCounterHi())
|
|
|
|
def test_multiple_command_protocol_error(self):
|
|
# Arrange
|
|
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b00000010_01]]
|
|
|
|
# Act
|
|
response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
|
|
|
|
# Assert
|
|
self.assertEqual(len(response), 2)
|
|
|
|
self.assertEqual(response[0], 0x02)
|
|
self.assertIsInstance(response[1], ProtocolError)
|
|
|
|
class NormalizeFrameTestCase(unittest.TestCase):
|
|
def test_words_with_no_address_no_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 0)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_words_with_no_address_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9))
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 9)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_words_with_address_no_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 0)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_words_with_address_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9))
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 9)
|
|
self.assertEqual(repeat_offset, 1)
|
|
|
|
def test_word_data_with_no_address_no_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac])
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 0)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_word_data_with_no_address_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9))
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 9)
|
|
self.assertEqual(repeat_offset, 1)
|
|
|
|
def test_word_data_with_address_no_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac])
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 0)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_word_data_with_address_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9))
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 9)
|
|
self.assertEqual(repeat_offset, 2)
|
|
|
|
def test_data_with_no_address_no_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac])
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 0)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_data_with_no_address_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9))
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 9)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_data_with_address_no_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac])
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 0)
|
|
self.assertEqual(repeat_offset, 0)
|
|
|
|
def test_data_with_address_repeat(self):
|
|
# Arrange
|
|
frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9))
|
|
|
|
# Act
|
|
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
|
|
|
|
# Assert
|
|
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
|
|
self.assertEqual(repeat_count, 9)
|
|
self.assertEqual(repeat_offset, 1)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|