Files
lowobservable.coax/pycoax/tests/test_interface.py

303 lines
10 KiB
Python

import unittest
from unittest.mock import Mock
import context
from coax.interface import Interface, normalize_frame
from coax.protocol import FrameFormat, ReadAddressCounterHi, ReadAddressCounterLo
from coax.exceptions import InterfaceError, ReceiveTimeout, ProtocolError
class InterfaceExecuteTestCase(unittest.TestCase):
def setUp(self):
self.interface = Interface()
self.interface._transmit_receive = Mock()
def test_single_command(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00]]
# Act
response = self.interface.execute(ReadAddressCounterHi())
# Assert
self.assertEqual(response, 0x02)
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(None, (FrameFormat.WORD_DATA, 0b000_00101_01))])
self.assertEqual(response_lengths, [1])
def test_single_addressed_command(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00]]
# Act
response = self.interface.execute((0b111000, ReadAddressCounterHi()))
# Assert
self.assertEqual(response, 0x02)
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(0b111000, (FrameFormat.WORD_DATA, 0b000_00101_01))])
self.assertEqual(response_lengths, [1])
def test_multiple_commands(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b11111111_00]]
# Act
response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
# Assert
self.assertEqual(response, [0x02, 0xff])
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(None, (FrameFormat.WORD_DATA, 0b000_00101_01)), (None, (FrameFormat.WORD_DATA, 0b000_10101_01))])
self.assertEqual(response_lengths, [1, 1])
def test_multiple_addressed_commands(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b11111111_00]]
# Act
response = self.interface.execute([(0b111000, ReadAddressCounterHi()), (0b111000, ReadAddressCounterLo())])
# Assert
self.assertEqual(response, [0x02, 0xff])
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(0b111000, (FrameFormat.WORD_DATA, 0b000_00101_01)), (0b111000, (FrameFormat.WORD_DATA, 0b000_10101_01))])
self.assertEqual(response_lengths, [1, 1])
def test_timeout(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00]]
# Act
response = self.interface.execute(ReadAddressCounterHi(), timeout=0.1)
# Assert
self.interface._transmit_receive.assert_called_once()
(_, _, timeout) = self.interface._transmit_receive.call_args[0]
self.assertEqual(timeout, 0.1)
def test_single_command_interface_error(self):
# Arrange
self.interface._transmit_receive.side_effect=InterfaceError()
# Act and assert
with self.assertRaises(InterfaceError):
self.interface.execute(ReadAddressCounterHi())
def test_multiple_command_interface_error(self):
# Arrange
self.interface._transmit_receive.side_effect=InterfaceError()
# Act and assert
with self.assertRaises(InterfaceError):
self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
def test_single_command_receive_timeout(self):
# Arrange
self.interface._transmit_receive.return_value=[ReceiveTimeout()]
# Act and assert
with self.assertRaises(ReceiveTimeout):
self.interface.execute(ReadAddressCounterHi())
def test_multiple_command_receive_timeout(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00], ReceiveTimeout()]
# Act
response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
# Assert
self.assertEqual(len(response), 2)
self.assertEqual(response[0], 0x02)
self.assertIsInstance(response[1], ReceiveTimeout)
def test_single_command_protocol_error(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_01]]
# Act and assert
with self.assertRaises(ProtocolError):
self.interface.execute(ReadAddressCounterHi())
def test_multiple_command_protocol_error(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b00000010_01]]
# Act
response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
# Assert
self.assertEqual(len(response), 2)
self.assertEqual(response[0], 0x02)
self.assertIsInstance(response[1], ProtocolError)
class NormalizeFrameTestCase(unittest.TestCase):
def test_words_with_no_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_words_with_no_address_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 0)
def test_words_with_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_words_with_address_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 1)
def test_word_data_with_no_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_word_data_with_no_address_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 1)
def test_word_data_with_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_word_data_with_address_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 2)
def test_data_with_no_address_no_repeat(self):
# Arrange
frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_data_with_no_address_repeat(self):
# Arrange
frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 0)
def test_data_with_address_no_repeat(self):
# Arrange
frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_data_with_address_repeat(self):
# Arrange
frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 1)
if __name__ == '__main__':
unittest.main()