mirror of
https://github.com/lowobservable/coax.git
synced 2026-03-01 17:57:41 +00:00
165 lines
4.3 KiB
Python
165 lines
4.3 KiB
Python
"""
|
|
coax.interface
|
|
~~~~~~~~~~~~~~
|
|
"""
|
|
|
|
from enum import Enum
|
|
|
|
from .protocol import FrameFormat, pack_data_word
|
|
from .exceptions import ProtocolError
|
|
|
|
class Interface:
|
|
"""3270 coax interface."""
|
|
|
|
def __init__(self):
|
|
self.features = set()
|
|
|
|
def reset(self):
|
|
"""Reset the interface."""
|
|
raise NotImplementedError
|
|
|
|
def execute(self, commands, timeout=None):
|
|
"""Execute one or more commands."""
|
|
(normalized_commands, has_multiple_commands) = _normalize_commands(commands)
|
|
|
|
responses = self._execute(normalized_commands, timeout)
|
|
|
|
if has_multiple_commands:
|
|
return responses
|
|
|
|
response = responses[0]
|
|
|
|
if isinstance(response, BaseException):
|
|
raise response
|
|
|
|
return response
|
|
|
|
def _execute(self, commands, timeout):
|
|
(outbound_frames, response_lengths) = _pack_outbound_frames(commands)
|
|
|
|
inbound_frames = self._transmit_receive(outbound_frames, response_lengths, timeout)
|
|
|
|
responses = _unpack_inbound_frames(inbound_frames, commands)
|
|
|
|
return responses
|
|
|
|
def _transmit_receive(self, outbound_frames, response_lengths, timeout):
|
|
raise NotImplementedError
|
|
|
|
class InterfaceFeature(Enum):
|
|
"""Interface feature."""
|
|
|
|
PROTOCOL_3299 = 0x10
|
|
|
|
def normalize_frame(address, frame):
|
|
"""Convert a coax frame into words, repeat count and offset."""
|
|
words = []
|
|
|
|
repeat_count = 0
|
|
repeat_offset = 0
|
|
|
|
if frame[0] == FrameFormat.WORDS:
|
|
if isinstance(frame[1], tuple):
|
|
repeat_count = frame[1][1]
|
|
|
|
words += frame[1][0]
|
|
else:
|
|
words += frame[1]
|
|
elif frame[0] == FrameFormat.WORD_DATA:
|
|
words.append(frame[1])
|
|
|
|
if len(frame) > 2:
|
|
if isinstance(frame[2], tuple):
|
|
repeat_offset = 1
|
|
repeat_count = frame[2][1]
|
|
|
|
words += [pack_data_word(byte) for byte in frame[2][0]]
|
|
else:
|
|
words += [pack_data_word(byte) for byte in frame[2]]
|
|
elif frame[0] == FrameFormat.DATA:
|
|
if isinstance(frame[1], tuple):
|
|
repeat_count = frame[1][1]
|
|
|
|
words += [pack_data_word(byte) for byte in frame[1][0]]
|
|
else:
|
|
words += [pack_data_word(byte) for byte in frame[1]]
|
|
|
|
if address is not None:
|
|
if address < 0 or address > 63:
|
|
raise ValueError('Address must be between 0 and 63')
|
|
|
|
words.insert(0, address)
|
|
|
|
if repeat_count > 0:
|
|
repeat_offset += 1
|
|
|
|
return (words, repeat_count, repeat_offset)
|
|
|
|
def _is_command(command):
|
|
return hasattr(command, 'pack_outbound_frame') and hasattr(command, 'unpack_inbound_frame')
|
|
|
|
def _normalize_command(command):
|
|
if _is_command(command):
|
|
return (None, command)
|
|
|
|
if not isinstance(command, tuple):
|
|
raise TypeError('Invalid command form')
|
|
|
|
if len(command) != 2:
|
|
raise TypeError('Invalid command form')
|
|
|
|
if not (command[0] is None or isinstance(command[0], int)):
|
|
raise TypeError('Invalid command form')
|
|
|
|
if not _is_command(command[1]):
|
|
raise TypeError('Invalid command form')
|
|
|
|
return command
|
|
|
|
def _normalize_commands(commands):
|
|
try:
|
|
command = _normalize_command(commands)
|
|
|
|
return ([command], False)
|
|
except TypeError:
|
|
pass
|
|
|
|
try:
|
|
commands = list(commands)
|
|
except TypeError:
|
|
raise TypeError('Commands must be valid command or iterable')
|
|
|
|
if not commands:
|
|
raise ValueError('Commands must not be empty')
|
|
|
|
return ([_normalize_command(command) for command in commands], True)
|
|
|
|
def _pack_outbound_frames(commands):
|
|
frames = []
|
|
response_lengths = []
|
|
|
|
for (address, command) in commands:
|
|
frame = command.pack_outbound_frame()
|
|
response_length = command.response_length or 1
|
|
|
|
frames.append((address, frame))
|
|
response_lengths.append(response_length)
|
|
|
|
return (frames, response_lengths)
|
|
|
|
def _unpack_inbound_frames(frames, commands):
|
|
responses = []
|
|
|
|
for (frame, (_, command)) in zip(frames, commands):
|
|
if isinstance(frame, BaseException):
|
|
responses.append(frame)
|
|
else:
|
|
try:
|
|
response = command.unpack_inbound_frame(frame)
|
|
except ProtocolError as error:
|
|
response = error
|
|
|
|
responses.append(response)
|
|
|
|
return responses
|