diff --git a/pycoax/coax/interface.py b/pycoax/coax/interface.py index 5ae3b95..e001094 100644 --- a/pycoax/coax/interface.py +++ b/pycoax/coax/interface.py @@ -5,6 +5,8 @@ coax.interface from enum import Enum +from .exceptions import ProtocolError + class Interface: """3270 coax interface.""" @@ -106,7 +108,10 @@ def _unpack_inbound_frames(frames, commands): if isinstance(frame, BaseException): responses.append(frame) else: - response = command.unpack_inbound_frame(frame) + try: + response = command.unpack_inbound_frame(frame) + except ProtocolError as error: + response = error responses.append(response) diff --git a/pycoax/tests/test_interface.py b/pycoax/tests/test_interface.py index c9e5ea4..f9c9d8b 100644 --- a/pycoax/tests/test_interface.py +++ b/pycoax/tests/test_interface.py @@ -5,7 +5,7 @@ import context from coax.interface import Interface, FrameFormat from coax.protocol import ReadAddressCounterHi, ReadAddressCounterLo -from coax.exceptions import ReceiveTimeout +from coax.exceptions import InterfaceError, ReceiveTimeout, ProtocolError class InterfaceExecuteTestCase(unittest.TestCase): def setUp(self): @@ -95,7 +95,23 @@ class InterfaceExecuteTestCase(unittest.TestCase): self.assertEqual(timeout, 0.1) - def test_single_command_exception(self): + def test_single_command_interface_error(self): + # Arrange + self.interface._transmit_receive.side_effect=InterfaceError() + + # Act and assert + with self.assertRaises(InterfaceError): + self.interface.execute(ReadAddressCounterHi()) + + def test_multiple_command_interface_error(self): + # Arrange + self.interface._transmit_receive.side_effect=InterfaceError() + + # Act and assert + with self.assertRaises(InterfaceError): + self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()]) + + def test_single_command_receive_timeout(self): # Arrange self.interface._transmit_receive.return_value=[ReceiveTimeout()] @@ -103,5 +119,39 @@ class InterfaceExecuteTestCase(unittest.TestCase): with self.assertRaises(ReceiveTimeout): self.interface.execute(ReadAddressCounterHi()) + def test_multiple_command_receive_timeout(self): + # Arrange + self.interface._transmit_receive.return_value=[[0b00000010_00], ReceiveTimeout()] + + # Act + response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()]) + + # Assert + self.assertEqual(len(response), 2) + + self.assertEqual(response[0], 0x02) + self.assertIsInstance(response[1], ReceiveTimeout) + + def test_single_command_protocol_error(self): + # Arrange + self.interface._transmit_receive.return_value=[[0b00000010_01]] + + # Act and assert + with self.assertRaises(ProtocolError): + self.interface.execute(ReadAddressCounterHi()) + + def test_multiple_command_protocol_error(self): + # Arrange + self.interface._transmit_receive.return_value=[[0b00000010_00], [0b00000010_01]] + + # Act + response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()]) + + # Assert + self.assertEqual(len(response), 2) + + self.assertEqual(response[0], 0x02) + self.assertIsInstance(response[1], ProtocolError) + if __name__ == '__main__': unittest.main()