From 2ce8075dae23e592a79e5dc48e4e7a0d0641f5d6 Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Wed, 2 Aug 2023 17:12:04 -0500 Subject: [PATCH] Refactor serial interface _pack_transmit_receive_message --- pycoax/coax/interface.py | 48 ++++++++-- pycoax/coax/protocol.py | 8 +- pycoax/coax/serial_interface.py | 67 ++++---------- pycoax/tests/test_interface.py | 149 +++++++++++++++++++++++++++++++- 4 files changed, 216 insertions(+), 56 deletions(-) diff --git a/pycoax/coax/interface.py b/pycoax/coax/interface.py index a885803..900c426 100644 --- a/pycoax/coax/interface.py +++ b/pycoax/coax/interface.py @@ -5,6 +5,7 @@ coax.interface from enum import Enum +from .protocol import FrameFormat, pack_data_word from .exceptions import ProtocolError class Interface: @@ -50,12 +51,49 @@ class InterfaceFeature(Enum): PROTOCOL_3299 = 0x10 -class FrameFormat(Enum): - """3270 coax frame format.""" +def normalize_frame(address, frame): + """Convert a coax frame into words, repeat count and offset.""" + words = [] - WORDS = 1 # 10-bit words - WORD_DATA = 2 # 10-bit word, 8-bit data words - DATA = 4 # 8-bit data words + repeat_count = 0 + repeat_offset = 0 + + if frame[0] == FrameFormat.WORDS: + if isinstance(frame[1], tuple): + repeat_count = frame[1][1] + + words += frame[1][0] + else: + words += frame[1] + elif frame[0] == FrameFormat.WORD_DATA: + words.append(frame[1]) + + if len(frame) > 2: + if isinstance(frame[2], tuple): + repeat_offset = 1 + repeat_count = frame[2][1] + + words += [pack_data_word(byte) for byte in frame[2][0]] + else: + words += [pack_data_word(byte) for byte in frame[2]] + elif frame[0] == FrameFormat.DATA: + if isinstance(frame[1], tuple): + repeat_count = frame[1][1] + + words += [pack_data_word(byte) for byte in frame[1][0]] + else: + words += [pack_data_word(byte) for byte in frame[1]] + + if address is not None: + if address < 0 or address > 63: + raise ValueError('Address must be between 0 and 63') + + words.insert(0, address) + + if repeat_count > 0: + repeat_offset += 1 + + return (words, repeat_count, repeat_offset) def _is_command(command): return hasattr(command, 'pack_outbound_frame') and hasattr(command, 'unpack_inbound_frame') diff --git a/pycoax/coax/protocol.py b/pycoax/coax/protocol.py index b45ee18..63b4979 100644 --- a/pycoax/coax/protocol.py +++ b/pycoax/coax/protocol.py @@ -5,10 +5,16 @@ coax.protocol from enum import Enum -from .interface import FrameFormat from .parity import odd_parity from .exceptions import ProtocolError +class FrameFormat(Enum): + """3270 coax frame format.""" + + WORDS = 1 # 10-bit words + WORD_DATA = 2 # 10-bit word, 8-bit data words + DATA = 4 # 8-bit data words + class Command(Enum): """Terminal command.""" diff --git a/pycoax/coax/serial_interface.py b/pycoax/coax/serial_interface.py index 1029094..ecf59d0 100644 --- a/pycoax/coax/serial_interface.py +++ b/pycoax/coax/serial_interface.py @@ -11,8 +11,7 @@ from contextlib import contextmanager from serial import Serial, SerialException from sliplib import SlipWrapper, ProtocolError -from .interface import Interface, InterfaceFeature, FrameFormat -from .protocol import pack_data_word +from .interface import Interface, InterfaceFeature, normalize_frame from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout class SerialInterface(Interface): @@ -185,55 +184,27 @@ def open_serial_interface(serial_port, reset=True): yield interface def _pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds): + # Convert the three frame formats to a simple list of 10-bit words with + # a repeat count and offset. + (words, repeat_count, repeat_offset) = normalize_frame(address, frame) + message = bytes([0x06]) - repeat_count = 0 - repeat_offset = 0 - bytes_ = bytearray() - - if frame[0] == FrameFormat.WORDS: - if isinstance(frame[1], tuple): - repeat_count = frame[1][1] - - for word in frame[1][0]: - bytes_ += struct.pack(' 2: - if isinstance(frame[2], tuple): - repeat_offset = 1 - repeat_count = frame[2][1] - - for byte in frame[2][0]: - bytes_ += struct.pack(' 63: - raise ValueError('Address must be between 0 and 63') - - if repeat_count > 0: - repeat_offset += 1 - - bytes_ = struct.pack('H', (repeat_offset << 15) | repeat_count) - message += bytes_ + + # Set the 3299 mode flag. + if address is not None: + words[0] |= 0x8000 + + for word in words: + message += struct.pack('HH', response_length, timeout_milliseconds) return message diff --git a/pycoax/tests/test_interface.py b/pycoax/tests/test_interface.py index f9c9d8b..49f0680 100644 --- a/pycoax/tests/test_interface.py +++ b/pycoax/tests/test_interface.py @@ -3,8 +3,8 @@ from unittest.mock import Mock import context -from coax.interface import Interface, FrameFormat -from coax.protocol import ReadAddressCounterHi, ReadAddressCounterLo +from coax.interface import Interface, normalize_frame +from coax.protocol import FrameFormat, ReadAddressCounterHi, ReadAddressCounterLo from coax.exceptions import InterfaceError, ReceiveTimeout, ProtocolError class InterfaceExecuteTestCase(unittest.TestCase): @@ -153,5 +153,150 @@ class InterfaceExecuteTestCase(unittest.TestCase): self.assertEqual(response[0], 0x02) self.assertIsInstance(response[1], ProtocolError) +class NormalizeFrameTestCase(unittest.TestCase): + def test_words_with_no_address_no_repeat(self): + # Arrange + frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(None, frame) + + # Assert + self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 0) + self.assertEqual(repeat_offset, 0) + + def test_words_with_no_address_repeat(self): + # Arrange + frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9)) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(None, frame) + + # Assert + self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 9) + self.assertEqual(repeat_offset, 0) + + def test_words_with_address_no_repeat(self): + # Arrange + frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame) + + # Assert + self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 0) + self.assertEqual(repeat_offset, 0) + + def test_words_with_address_repeat(self): + # Arrange + frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9)) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame) + + # Assert + self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 9) + self.assertEqual(repeat_offset, 1) + + def test_word_data_with_no_address_no_repeat(self): + # Arrange + frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac]) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(None, frame) + + # Assert + self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 0) + self.assertEqual(repeat_offset, 0) + + def test_word_data_with_no_address_repeat(self): + # Arrange + frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9)) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(None, frame) + + # Assert + self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 9) + self.assertEqual(repeat_offset, 1) + + def test_word_data_with_address_no_repeat(self): + # Arrange + frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac]) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame) + + # Assert + self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 0) + self.assertEqual(repeat_offset, 0) + + def test_word_data_with_address_repeat(self): + # Arrange + frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9)) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame) + + # Assert + self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 9) + self.assertEqual(repeat_offset, 2) + + def test_data_with_no_address_no_repeat(self): + # Arrange + frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac]) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(None, frame) + + # Assert + self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 0) + self.assertEqual(repeat_offset, 0) + + def test_data_with_no_address_repeat(self): + # Arrange + frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9)) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(None, frame) + + # Assert + self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 9) + self.assertEqual(repeat_offset, 0) + + def test_data_with_address_no_repeat(self): + # Arrange + frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac]) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame) + + # Assert + self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 0) + self.assertEqual(repeat_offset, 0) + + def test_data_with_address_repeat(self): + # Arrange + frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9)) + + # Act + (words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame) + + # Assert + self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10]) + self.assertEqual(repeat_count, 9) + self.assertEqual(repeat_offset, 1) + if __name__ == '__main__': unittest.main()