""" coax.serial_interface ~~~~~~~~~~~~~~~~~~~~~ """ import time import os import struct from copy import copy from contextlib import contextmanager from serial import Serial, SerialException from sliplib import SlipWrapper, ProtocolError from .interface import Interface, InterfaceFeature, normalize_frame from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout class SerialInterface(Interface): """Serial attached 3270 coax interface.""" def __init__(self, serial): if serial is None: raise ValueError('Serial port is required') super().__init__() self.serial = serial self.slip_serial = SlipSerial(self.serial) self.legacy_firmware_detected = None self.legacy_firmware_version = None def reset(self): """Reset the interface.""" original_serial_timeout = self.serial.timeout self.serial.timeout = 5 self.serial.reset_input_buffer() self._write_message(bytes([0x01])) try: message = self._read_message() finally: self.serial.timeout = original_serial_timeout self.serial.reset_input_buffer() if message[0] != 0x01: raise _convert_error(message) if message[1:] == b'\x32\x70': self.legacy_firmware_detected = False self.legacy_firmware_version = None elif len(message) == 4: (major, minor, patch) = struct.unpack('BBB', message[1:]) self.legacy_firmware_detected = True self.legacy_firmware_version = f'{major}.{minor}.{patch}' else: raise InterfaceError(f'Invalid reset response: {message}') # Query features, if this is not a legacy firmware. if not self.legacy_firmware_detected: try: self.features = self._get_features() except InterfaceError: pass def enter_dfu_mode(self): """Enter device firmware upgrade mode.""" message = bytes([0xf2]) self._write_message(message) message = self._read_message() if message[0] != 0x01: raise _convert_error(message) def _get_features(self): """Get interface features.""" message = bytes([0xf0, 0x07]) self._write_message(message) message = self._read_message() if message[0] != 0x01: raise _convert_error(message) known_feature_values = {feature.value for feature in InterfaceFeature} features = {InterfaceFeature(value) for value in message[1:] if value in known_feature_values} return features def _transmit_receive(self, outbound_frames, response_lengths, timeout): if len(response_lengths) != len(outbound_frames): raise ValueError('Response lengths length must equal outbound frames length') if any(address is not None for (address, _) in outbound_frames) and InterfaceFeature.PROTOCOL_3299 not in self.features: raise NotImplementedError('Interface does not support 3299 protocol') # Pack all messages before sending. timeout_milliseconds = self._calculate_timeout_milliseconds(timeout) messages = [_pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds) for ((address, frame), response_length) in zip(outbound_frames, response_lengths)] responses = [] for message in messages: self._write_message(message) message = self._read_message() if message[0] == 0x01: response = _unpack_transmit_receive_response(message[1:]) else: error = _convert_error(message) if not isinstance(error, (ReceiveError, ReceiveTimeout)): raise error response = error responses.append(response) return responses def _calculate_timeout_milliseconds(self, timeout): milliseconds = 0 if timeout: if self.serial.timeout and timeout > self.serial.timeout: raise ValueError('Timeout cannot be greater than serial timeout') milliseconds = int(timeout * 1000) return milliseconds def _read_message(self): try: message = self.slip_serial.recv_msg() except ProtocolError: raise InterfaceError('SLIP protocol error') if len(message) < 4: raise InterfaceError(f'Invalid response message: {message}') (length,) = struct.unpack('>H', message[:2]) if length != len(message) - 4: raise InterfaceError('Response message length mismatch') if length < 1: raise InterfaceError('Empty response message') return message[2:-2] def _write_message(self, message): self.slip_serial.send_msg(struct.pack('>H', len(message)) + message + struct.pack('>H', 0)) @contextmanager def open_serial_interface(serial_port, reset=True): """Opens serial port and initializes serial attached 3270 coax interface.""" with WindowsSafeSerial(serial_port, 115200) as serial: serial.reset_input_buffer() serial.reset_output_buffer() # Allow the interface firmware time to start, this is only required for the # original Arduino Mega based interface. if 'COAX_FAST_START' not in os.environ: time.sleep(3) interface = SerialInterface(serial) if reset: interface.reset() yield interface def _pack_transmit_receive_message(address, frame, response_length, timeout_milliseconds): # Convert the three frame formats to a simple list of 10-bit words with # a repeat count and offset. (words, repeat_count, repeat_offset) = normalize_frame(address, frame) message = bytes([0x06]) # NOTE: Although the frame normalization routine may result in a repeat # offset greater than 1 if an addressed WORD_DATA frame has a repeat # count, this WILL fail to be packed below as it will overflow the # unsigned short field here. Today, oec does not use a repeat with an # addressed WORD_DATA frame as the "jumbo write" function will always # expand addressed frames. message += struct.pack('>H', (repeat_offset << 15) | repeat_count) # Set the 3299 mode flag. if address is not None: words[0] |= 0x8000 for word in words: message += struct.pack('HH', response_length, timeout_milliseconds) return message def _unpack_transmit_receive_response(bytes_): return [(hi << 8) | lo for (lo, hi) in zip(bytes_[::2], bytes_[1::2])] ERROR_MAP = { 1: InterfaceError('Invalid request message'), 2: InterfaceError('Unknown command'), 101: InterfaceError('Receiver active'), 102: ReceiveTimeout(), 103: ReceiveError('Receiver buffer overflow'), 104: ReceiveError('Receiver error') } def _convert_error(message): if message[0] != 0x02: return InterfaceError(f'Invalid response: {message}') if len(message) < 2: return InterfaceError(f'Invalid error response: {message}') if message[1] in ERROR_MAP: error = copy(ERROR_MAP[message[1]]) # Append description if included. if len(message) > 2: description = message[2:].decode('ascii') if error.args: error.args = (f'{error.args[0]}: {description}', *error.args[1:]) else: error.args = (description,) return error return InterfaceError(f'Unknown error: {message[1]}') class SlipSerial(SlipWrapper): """sliplib wrapper for pySerial.""" def send_bytes(self, packet): """Sends a packet over the serial port.""" self.stream.write(packet) self.stream.flush() def recv_bytes(self): """Receive data from the serial port.""" if self.stream.closed: return b'' count = self.stream.in_waiting if count: return self.stream.read(count) byte = self.stream.read(1) if byte == b'': raise InterfaceTimeout() return byte # Based on workaround detailed in https://github.com/pyserial/pyserial/issues/362 class WindowsSafeSerial(Serial): def _reconfigure_port(self, *args, **kwargs): try: super()._reconfigure_port(*args, **kwargs) except SerialException as error: if not self._is_windows_open_error(error): raise def _is_windows_open_error(self, error): message = str(error) if os.name == 'nt' and 'OSError(22, \'The parameter is incorrect.\', None, 87)' in message: return True return False