2023-01-10 23:30:39 -06:00

191 lines
5.8 KiB
Python

"""
oec.device
~~~~~~~~~~
"""
import time
import logging
from more_itertools import chunked
from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \
TerminalType, LoadAddressCounterLo, LoadSecondaryControl, \
SecondaryControl, ProtocolError
from coax.multiplexer import PORT_MAP_3299
from .interface import ExecuteError
logger = logging.getLogger(__name__)
class Device:
"""A device."""
def __init__(self, interface, device_address):
self.interface = interface
self.device_address = device_address
def setup(self):
"""Setup the device."""
raise NotImplementedError
def get_poll_action(self):
"""Get the POLL action."""
raise NotImplementedError
def execute(self, commands):
"""Execute one or more commands."""
return self.interface.execute(address_commands(self.device_address, commands))
def execute_jumbo_write(self, data, create_first, create_subsequent, first_chunk_max_length_adjustment=-1):
"""Execute a jumbo write command that can be split."""
max_length = None
# The 3299 multiplexer appears to have some frame length limit, after which it will
# stop transmitting. I've not determined the actual limit, but 1024 appears to work.
if self.device_address is not None:
max_length = 1024
elif self.interface.jumbo_write_strategy == 'split':
max_length = self.interface.jumbo_write_max_length
chunks = _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment)
commands = [create_first(chunks[0])]
for chunk in chunks[1:]:
commands.append(create_subsequent(chunk))
if len(commands) > 1 and logger.isEnabledFor(logging.DEBUG):
logger.debug(f'Jumbo write split into {len(commands)}')
return self.execute(commands)
class UnsupportedDeviceError(Exception):
"""Unsupported device."""
def address_commands(device_address, commands):
"""Add device address to commands."""
if isinstance(commands, list):
return [(device_address, command) for command in commands]
return (device_address, commands)
def format_address(interface, device_address):
"""Format a device address."""
if device_address is None:
return f'{interface.identifier}#0'
try:
return f'{interface.identifier}#{PORT_MAP_3299.index(device_address)}'
except ValueError:
return f'{interface.identifier}?{device_address:06b}'
def get_ids(interface, device_address, extended_id_retry_attempts=3):
terminal_id = None
try:
terminal_id = interface.execute(address_commands(device_address, ReadTerminalId()))
except ProtocolError as error:
logger.warning(f'READ_TERMINAL_ID error: {error}')
extended_id = None
if terminal_id is not None and terminal_id.type != TerminalType.DFT:
# The READ_EXTENDED_ID command behaves similarly to the READ_MULTIPLE command and
# will terminate when the two low order bits of the address counter are zero. In
# order to read the entire 4 bytes of the extended ID reliably, we need to set
# the secondary control register to disable "big read" and set the address counter
# accordingly.
#
# The address counter will be reset later during device setup.
commands = [LoadSecondaryControl(SecondaryControl(big=False)), LoadAddressCounterLo(0), ReadExtendedId()]
try:
extended_id = interface.execute(address_commands(device_address, commands))[-1]
except ExecuteError as error:
logger.warning(f'READ_EXTENDED_ID error: {error}')
return (terminal_id, extended_id.hex() if extended_id is not None else None)
def get_features(interface, device_address):
commands = read_feature_ids()
ids = interface.execute(address_commands(device_address, commands))
return parse_features(ids, commands)
def get_keyboard_description(terminal_id, extended_id):
is_3278 = extended_id is None or not int(extended_id[0:2], 16) & 0x80
if is_3278:
description = '3278'
id_map = {
0b0001: 'APL',
0b0010: 'TEXT',
0b0100: 'TYPEWRITER-PSHICO',
0b0101: 'APL',
0b0110: 'TEXT',
0b0111: 'APL-PSHICO',
0b1000: 'DATAENTRY-2',
0b1001: 'DATAENTRY-1',
0b1010: 'TYPEWRITER',
0b1100: 'DATAENTRY-2',
0b1101: 'DATAENTRY-1',
0b1110: 'TYPEWRITER'
}
if terminal_id.keyboard in id_map:
description += '-' + id_map[terminal_id.keyboard]
return description
id_ = int(extended_id[0:2], 16) & 0x1f
is_user = int(extended_id[0:2], 16) & 0x20
if is_user:
description = 'USER'
if id_ in [1, 2, 3, 4]:
description += f'-{id_}'
return description
is_ibm = not int(extended_id[6:8], 16) & 0x80
description = 'IBM' if is_ibm else 'UNKNOWN'
is_enhanced = int(extended_id[6:8], 16) & 0x01
if is_enhanced:
if id_ == 1:
return description + '-ENHANCED'
return None
if id_ == 1:
return description + '-TYPEWRITER'
elif id_ == 2:
return description + '-DATAENTRY'
elif id_ == 3:
return description + '-APL'
return None
def _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment=-1):
if max_length is None:
return [data]
if isinstance(data, tuple):
length = len(data[0]) * data[1]
else:
length = len(data)
first_chunk_max_length = max_length + first_chunk_max_length_adjustment
if length <= first_chunk_max_length:
return [data]
if isinstance(data, tuple):
data = data[0] * data[1]
return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], max_length)]