mirror of
https://github.com/lowobservable/coax.git
synced 2026-02-28 17:39:38 +00:00
289 lines
8.7 KiB
Python
289 lines
8.7 KiB
Python
"""
|
|
coax.serial_interface
|
|
~~~~~~~~~~~~~~~~~~~~~
|
|
"""
|
|
|
|
import time
|
|
import os
|
|
import struct
|
|
from copy import copy
|
|
from contextlib import contextmanager
|
|
from serial import Serial, SerialException
|
|
from sliplib import SlipWrapper, ProtocolError
|
|
|
|
from .interface import Interface, InterfaceFeature, normalize_frame
|
|
from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout
|
|
|
|
class SerialInterface(Interface):
|
|
"""Serial attached 3270 coax interface."""
|
|
|
|
def __init__(self, serial):
|
|
if serial is None:
|
|
raise ValueError('Serial port is required')
|
|
|
|
super().__init__()
|
|
|
|
self.serial = serial
|
|
|
|
self.slip_serial = SlipSerial(self.serial)
|
|
|
|
self.legacy_firmware_detected = None
|
|
self.legacy_firmware_version = None
|
|
|
|
def reset(self):
|
|
"""Reset the interface."""
|
|
original_serial_timeout = self.serial.timeout
|
|
|
|
self.serial.timeout = 5
|
|
|
|
self.serial.reset_input_buffer()
|
|
|
|
self._write_message(bytes([0x01]))
|
|
|
|
try:
|
|
message = self._read_message()
|
|
finally:
|
|
self.serial.timeout = original_serial_timeout
|
|
|
|
self.serial.reset_input_buffer()
|
|
|
|
if message[0] != 0x01:
|
|
raise _convert_error(message)
|
|
|
|
if message[1:] == b'\x32\x70':
|
|
self.legacy_firmware_detected = False
|
|
self.legacy_firmware_version = None
|
|
elif len(message) == 4:
|
|
(major, minor, patch) = struct.unpack('BBB', message[1:])
|
|
|
|
self.legacy_firmware_detected = True
|
|
self.legacy_firmware_version = f'{major}.{minor}.{patch}'
|
|
else:
|
|
raise InterfaceError(f'Invalid reset response: {message}')
|
|
|
|
# Query features, if this is not a legacy firmware.
|
|
if not self.legacy_firmware_detected:
|
|
try:
|
|
self.features = self._get_features()
|
|
except InterfaceError:
|
|
pass
|
|
|
|
def enter_dfu_mode(self):
|
|
"""Enter device firmware upgrade mode."""
|
|
message = bytes([0xf2])
|
|
|
|
self._write_message(message)
|
|
|
|
message = self._read_message()
|
|
|
|
if message[0] != 0x01:
|
|
raise _convert_error(message)
|
|
|
|
def _get_features(self):
|
|
"""Get interface features."""
|
|
message = bytes([0xf0, 0x07])
|
|
|
|
self._write_message(message)
|
|
|
|
message = self._read_message()
|
|
|
|
if message[0] != 0x01:
|
|
raise _convert_error(message)
|
|
|
|
known_feature_values = {feature.value for feature in InterfaceFeature}
|
|
|
|
features = {InterfaceFeature(value) for value in message[1:] if value in known_feature_values}
|
|
|
|
return features
|
|
|
|
def _transmit_receive(self, outbound_frames, response_lengths, timeout):
|
|
if len(response_lengths) != len(outbound_frames):
|
|
raise ValueError('Response lengths length must equal outbound frames length')
|
|
|
|
if any(address is not None for (address, _) in outbound_frames) and InterfaceFeature.PROTOCOL_3299 not in self.features:
|
|
raise NotImplementedError('Interface does not support 3299 protocol')
|
|
|
|
# Pack all messages before sending.
|
|
timeout_milliseconds = self._calculate_timeout_milliseconds(timeout)
|
|
|
|
messages = [_pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds)
|
|
for ((address, frame), response_length) in zip(outbound_frames, response_lengths)]
|
|
|
|
responses = []
|
|
|
|
for message in messages:
|
|
self._write_message(message)
|
|
|
|
message = self._read_message()
|
|
|
|
if message[0] == 0x01:
|
|
response = _unpack_transmit_receive_response(message[1:])
|
|
else:
|
|
error = _convert_error(message)
|
|
|
|
if not isinstance(error, (ReceiveError, ReceiveTimeout)):
|
|
raise error
|
|
|
|
response = error
|
|
|
|
responses.append(response)
|
|
|
|
return responses
|
|
|
|
def _calculate_timeout_milliseconds(self, timeout):
|
|
milliseconds = 0
|
|
|
|
if timeout:
|
|
if self.serial.timeout and timeout > self.serial.timeout:
|
|
raise ValueError('Timeout cannot be greater than serial timeout')
|
|
|
|
milliseconds = int(timeout * 1000)
|
|
|
|
return milliseconds
|
|
|
|
def _read_message(self):
|
|
try:
|
|
message = self.slip_serial.recv_msg()
|
|
except ProtocolError:
|
|
raise InterfaceError('SLIP protocol error')
|
|
|
|
if len(message) < 4:
|
|
raise InterfaceError(f'Invalid response message: {message}')
|
|
|
|
(length,) = struct.unpack('>H', message[:2])
|
|
|
|
if length != len(message) - 4:
|
|
raise InterfaceError('Response message length mismatch')
|
|
|
|
if length < 1:
|
|
raise InterfaceError('Empty response message')
|
|
|
|
return message[2:-2]
|
|
|
|
def _write_message(self, message):
|
|
self.slip_serial.send_msg(struct.pack('>H', len(message)) + message +
|
|
struct.pack('>H', 0))
|
|
|
|
@contextmanager
|
|
def open_serial_interface(serial_port, reset=True):
|
|
"""Opens serial port and initializes serial attached 3270 coax interface."""
|
|
with WindowsSafeSerial(serial_port, 115200) as serial:
|
|
serial.reset_input_buffer()
|
|
serial.reset_output_buffer()
|
|
|
|
# Allow the interface firmware time to start, this is only required for the
|
|
# original Arduino Mega based interface.
|
|
if 'COAX_FAST_START' not in os.environ:
|
|
time.sleep(3)
|
|
|
|
interface = SerialInterface(serial)
|
|
|
|
if reset:
|
|
interface.reset()
|
|
|
|
yield interface
|
|
|
|
def _pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds):
|
|
# Convert the three frame formats to a simple list of 10-bit words with
|
|
# a repeat count and offset.
|
|
(words, repeat_count, repeat_offset) = normalize_frame(address, frame)
|
|
|
|
message = bytes([0x06])
|
|
|
|
# NOTE: Although the frame normalization routine may result in a repeat
|
|
# offset greater than 1 if an addressed WORD_DATA frame has a repeat
|
|
# count, this WILL fail to be packed below as it will overflow the
|
|
# unsigned short field here. Today, oec does not use a repeat with an
|
|
# addressed WORD_DATA frame as the "jumbo write" function will always
|
|
# expand addressed frames.
|
|
message += struct.pack('>H', (repeat_offset << 15) | repeat_count)
|
|
|
|
# Set the 3299 mode flag.
|
|
if address is not None:
|
|
words[0] |= 0x8000
|
|
|
|
for word in words:
|
|
message += struct.pack('<H', word)
|
|
|
|
message += struct.pack('>HH', response_length, timeout_milliseconds)
|
|
|
|
return message
|
|
|
|
def _unpack_transmit_receive_response(bytes_):
|
|
return [(hi << 8) | lo for (lo, hi) in zip(bytes_[::2], bytes_[1::2])]
|
|
|
|
ERROR_MAP = {
|
|
1: InterfaceError('Invalid request message'),
|
|
2: InterfaceError('Unknown command'),
|
|
|
|
101: InterfaceError('Receiver active'),
|
|
102: ReceiveTimeout(),
|
|
103: ReceiveError('Receiver buffer overflow'),
|
|
104: ReceiveError('Receiver error')
|
|
}
|
|
|
|
def _convert_error(message):
|
|
if message[0] != 0x02:
|
|
return InterfaceError(f'Invalid response: {message}')
|
|
|
|
if len(message) < 2:
|
|
return InterfaceError(f'Invalid error response: {message}')
|
|
|
|
if message[1] in ERROR_MAP:
|
|
error = copy(ERROR_MAP[message[1]])
|
|
|
|
# Append description if included.
|
|
if len(message) > 2:
|
|
description = message[2:].decode('ascii')
|
|
|
|
if error.args:
|
|
error.args = (f'{error.args[0]}: {description}', *error.args[1:])
|
|
else:
|
|
error.args = (description,)
|
|
|
|
return error
|
|
|
|
return InterfaceError(f'Unknown error: {message[1]}')
|
|
|
|
class SlipSerial(SlipWrapper):
|
|
"""sliplib wrapper for pySerial."""
|
|
|
|
def send_bytes(self, packet):
|
|
"""Sends a packet over the serial port."""
|
|
self.stream.write(packet)
|
|
self.stream.flush()
|
|
|
|
def recv_bytes(self):
|
|
"""Receive data from the serial port."""
|
|
if self.stream.closed:
|
|
return b''
|
|
|
|
count = self.stream.in_waiting
|
|
|
|
if count:
|
|
return self.stream.read(count)
|
|
|
|
byte = self.stream.read(1)
|
|
|
|
if byte == b'':
|
|
raise InterfaceTimeout()
|
|
|
|
return byte
|
|
|
|
# Based on workaround detailed in https://github.com/pyserial/pyserial/issues/362
|
|
class WindowsSafeSerial(Serial):
|
|
def _reconfigure_port(self, *args, **kwargs):
|
|
try:
|
|
super()._reconfigure_port(*args, **kwargs)
|
|
except SerialException as error:
|
|
if not self._is_windows_open_error(error):
|
|
raise
|
|
|
|
def _is_windows_open_error(self, error):
|
|
message = str(error)
|
|
|
|
if os.name == 'nt' and 'OSError(22, \'The parameter is incorrect.\', None, 87)' in message:
|
|
return True
|
|
|
|
return False
|