Refactor serial interface _pack_transmit_receive_message

This commit is contained in:
Andrew Kay
2023-08-02 17:12:04 -05:00
parent 9033653ca7
commit 2ce8075dae
4 changed files with 216 additions and 56 deletions

View File

@@ -5,6 +5,7 @@ coax.interface
from enum import Enum
from .protocol import FrameFormat, pack_data_word
from .exceptions import ProtocolError
class Interface:
@@ -50,12 +51,49 @@ class InterfaceFeature(Enum):
PROTOCOL_3299 = 0x10
class FrameFormat(Enum):
"""3270 coax frame format."""
def normalize_frame(address, frame):
"""Convert a coax frame into words, repeat count and offset."""
words = []
WORDS = 1 # 10-bit words
WORD_DATA = 2 # 10-bit word, 8-bit data words
DATA = 4 # 8-bit data words
repeat_count = 0
repeat_offset = 0
if frame[0] == FrameFormat.WORDS:
if isinstance(frame[1], tuple):
repeat_count = frame[1][1]
words += frame[1][0]
else:
words += frame[1]
elif frame[0] == FrameFormat.WORD_DATA:
words.append(frame[1])
if len(frame) > 2:
if isinstance(frame[2], tuple):
repeat_offset = 1
repeat_count = frame[2][1]
words += [pack_data_word(byte) for byte in frame[2][0]]
else:
words += [pack_data_word(byte) for byte in frame[2]]
elif frame[0] == FrameFormat.DATA:
if isinstance(frame[1], tuple):
repeat_count = frame[1][1]
words += [pack_data_word(byte) for byte in frame[1][0]]
else:
words += [pack_data_word(byte) for byte in frame[1]]
if address is not None:
if address < 0 or address > 63:
raise ValueError('Address must be between 0 and 63')
words.insert(0, address)
if repeat_count > 0:
repeat_offset += 1
return (words, repeat_count, repeat_offset)
def _is_command(command):
return hasattr(command, 'pack_outbound_frame') and hasattr(command, 'unpack_inbound_frame')

View File

@@ -5,10 +5,16 @@ coax.protocol
from enum import Enum
from .interface import FrameFormat
from .parity import odd_parity
from .exceptions import ProtocolError
class FrameFormat(Enum):
"""3270 coax frame format."""
WORDS = 1 # 10-bit words
WORD_DATA = 2 # 10-bit word, 8-bit data words
DATA = 4 # 8-bit data words
class Command(Enum):
"""Terminal command."""

View File

@@ -11,8 +11,7 @@ from contextlib import contextmanager
from serial import Serial, SerialException
from sliplib import SlipWrapper, ProtocolError
from .interface import Interface, InterfaceFeature, FrameFormat
from .protocol import pack_data_word
from .interface import Interface, InterfaceFeature, normalize_frame
from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout
class SerialInterface(Interface):
@@ -185,55 +184,27 @@ def open_serial_interface(serial_port, reset=True):
yield interface
def _pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds):
# Convert the three frame formats to a simple list of 10-bit words with
# a repeat count and offset.
(words, repeat_count, repeat_offset) = normalize_frame(address, frame)
message = bytes([0x06])
repeat_count = 0
repeat_offset = 0
bytes_ = bytearray()
if frame[0] == FrameFormat.WORDS:
if isinstance(frame[1], tuple):
repeat_count = frame[1][1]
for word in frame[1][0]:
bytes_ += struct.pack('<H', word)
else:
for word in frame[1]:
bytes_ += struct.pack('<H', word)
elif frame[0] == FrameFormat.WORD_DATA:
bytes_ += struct.pack('<H', frame[1])
if len(frame) > 2:
if isinstance(frame[2], tuple):
repeat_offset = 1
repeat_count = frame[2][1]
for byte in frame[2][0]:
bytes_ += struct.pack('<H', pack_data_word(byte))
else:
for byte in frame[2]:
bytes_ += struct.pack('<H', pack_data_word(byte))
elif frame[0] == FrameFormat.DATA:
if isinstance(frame[1], tuple):
repeat_count = frame[1][1]
for byte in frame[1][0]:
bytes_ += struct.pack('<H', pack_data_word(byte))
else:
for byte in frame[1]:
bytes_ += struct.pack('<H', pack_data_word(byte))
if address is not None:
if address < 0 or address > 63:
raise ValueError('Address must be between 0 and 63')
if repeat_count > 0:
repeat_offset += 1
bytes_ = struct.pack('<H', 0x8000 | address) + bytes_
# NOTE: Although the frame normalization routine may result in a repeat
# offset greater than 1 if an addressed WORD_DATA frame has a repeat
# count, this WILL fail to be packed below as it will overflow the
# unsigned short field here. Today, oec does not use a repeat with an
# addressed WORD_DATA frame as the "jumbo write" function will always
# expand addressed frames.
message += struct.pack('>H', (repeat_offset << 15) | repeat_count)
message += bytes_
# Set the 3299 mode flag.
if address is not None:
words[0] |= 0x8000
for word in words:
message += struct.pack('<H', word)
message += struct.pack('>HH', response_length, timeout_milliseconds)
return message

View File

@@ -3,8 +3,8 @@ from unittest.mock import Mock
import context
from coax.interface import Interface, FrameFormat
from coax.protocol import ReadAddressCounterHi, ReadAddressCounterLo
from coax.interface import Interface, normalize_frame
from coax.protocol import FrameFormat, ReadAddressCounterHi, ReadAddressCounterLo
from coax.exceptions import InterfaceError, ReceiveTimeout, ProtocolError
class InterfaceExecuteTestCase(unittest.TestCase):
@@ -153,5 +153,150 @@ class InterfaceExecuteTestCase(unittest.TestCase):
self.assertEqual(response[0], 0x02)
self.assertIsInstance(response[1], ProtocolError)
class NormalizeFrameTestCase(unittest.TestCase):
def test_words_with_no_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_words_with_no_address_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 0)
def test_words_with_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_words_with_address_repeat(self):
# Arrange
frame = (FrameFormat.WORDS, ([0b10101000_00, 0b10100001_00, 0b10101100_10], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 1)
def test_word_data_with_no_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_word_data_with_no_address_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 1)
def test_word_data_with_address_no_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_word_data_with_address_repeat(self):
# Arrange
frame = (FrameFormat.WORD_DATA, 0b000_01100_01, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b000_01100_01, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 2)
def test_data_with_no_address_no_repeat(self):
# Arrange
frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_data_with_no_address_repeat(self):
# Arrange
frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(None, frame)
# Assert
self.assertEqual(words, [0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 0)
def test_data_with_address_no_repeat(self):
# Arrange
frame = (FrameFormat.DATA, [0xa8, 0xa1, 0xac])
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 0)
self.assertEqual(repeat_offset, 0)
def test_data_with_address_repeat(self):
# Arrange
frame = (FrameFormat.DATA, ([0xa8, 0xa1, 0xac], 9))
# Act
(words, repeat_count, repeat_offset) = normalize_frame(0b111000, frame)
# Assert
self.assertEqual(words, [0b0000_111000, 0b10101000_00, 0b10100001_00, 0b10101100_10])
self.assertEqual(repeat_count, 9)
self.assertEqual(repeat_offset, 1)
if __name__ == '__main__':
unittest.main()