Refactor controller and add device

This commit is contained in:
Andrew Kay
2021-11-11 19:15:29 -06:00
parent b472b5ccb1
commit 8b5421cf3c
13 changed files with 604 additions and 489 deletions

View File

@@ -2,26 +2,30 @@ import os
import signal
import logging
import argparse
from coax import open_serial_interface
from coax import open_serial_interface, TerminalType
from .interface import InterfaceWrapper
from .controller import Controller
from .device import get_ids, get_features, UnsupportedDeviceError
from .terminal import Terminal
from .tn3270 import TN3270Session
# VT100 emulation is not supported on Windows.
is_vt100_available = False
IS_VT100_AVAILABLE = False
if os.name == 'posix':
from .vt100 import VT100Session
is_vt100_available = True
IS_VT100_AVAILABLE = True
from .keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from .keymap_3483 import KEYMAP as KEYMAP_3483
logging.basicConfig(level=logging.INFO)
controller = None
logger = logging.getLogger('oec')
CONTROLLER = None
def _get_keymap(terminal_id, extended_id):
keymap = KEYMAP_3278_2
@@ -34,35 +38,58 @@ def _get_keymap(terminal_id, extended_id):
return keymap
def _create_session(args, terminal):
if args.emulator == 'tn3270':
return TN3270Session(terminal, args.host, args.port)
def _create_device(args, interface, device_address, poll_response):
# Read the terminal identifiers.
(terminal_id, extended_id) = get_ids(interface, device_address)
if args.emulator == 'vt100' and is_vt100_available:
logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}')
if terminal_id.type != TerminalType.CUT:
raise UnsupportedDeviceError('Only CUT type terminals are supported')
# Read the terminal features.
features = get_features(interface, device_address)
logger.info(f'Features = {features}')
# Get the keymap.
keymap = _get_keymap(terminal_id, extended_id)
logger.info(f'Keymap = {keymap.name}')
# Create the terminal.
terminal = Terminal(interface, device_address, terminal_id, extended_id, features, keymap)
return terminal
def _create_session(args, device):
if args.emulator == 'tn3270':
return TN3270Session(device, args.host, args.port)
if args.emulator == 'vt100' and IS_VT100_AVAILABLE:
host_command = [args.command, *args.command_args]
return VT100Session(terminal, host_command)
return VT100Session(device, host_command)
raise ValueError('Unsupported emulator')
def _signal_handler(number, frame):
global controller
global CONTROLLER
print('Stopping controller...')
if controller:
controller.stop()
if CONTROLLER:
CONTROLLER.stop()
controller = None
CONTROLLER = None
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
def main():
global controller
global CONTROLLER
parser = argparse.ArgumentParser(description=('An open replacement for the IBM 3174 '
'Establishment Controller'))
parser = argparse.ArgumentParser(description='IBM 3270 terminal controller')
parser.add_argument('serial_port', help='Serial port')
@@ -75,7 +102,7 @@ def main():
tn3270_parser.add_argument('host', help='Hostname')
tn3270_parser.add_argument('port', nargs='?', default=23, type=int)
if is_vt100_available:
if IS_VT100_AVAILABLE:
vt100_parser = subparsers.add_parser('vt100', description='VT100 emulator',
help='VT100 emulator')
@@ -85,14 +112,15 @@ def main():
args = parser.parse_args()
with open_serial_interface(args.serial_port) as interface:
create_session = lambda terminal: _create_session(args, terminal)
create_device = lambda interface, device_address, poll_response: _create_device(args, interface, device_address, poll_response)
create_session = lambda device: _create_session(args, device)
controller = Controller(InterfaceWrapper(interface), _get_keymap, create_session)
with open_serial_interface(args.serial_port) as interface:
CONTROLLER = Controller(InterfaceWrapper(interface), create_device, create_session)
print('Starting controller...')
controller.run()
CONTROLLER.run()
if __name__ == '__main__':
main()

View File

@@ -6,39 +6,37 @@ oec.controller
import time
import logging
import selectors
from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout, \
ReceiveError, ProtocolError
from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout
from .interface import address_commands
from .terminal import create_terminal, UnsupportedTerminalError
from .device import address_commands, format_address, UnsupportedDeviceError
from .keyboard import Key
from .session import SessionDisconnectedError
class Controller:
"""The controller."""
def __init__(self, interface, get_keymap, create_session):
def __init__(self, interface, create_device, create_session):
self.logger = logging.getLogger(__name__)
self.interface = interface
self.running = False
self.interface = interface
self.get_keymap = get_keymap
self.create_device = create_device
self.create_session = create_session
self.terminal = None
self.session = None
self.device = None
self.session = None
self.session_selector = None
# Target time between POLL commands in seconds when a terminal is connected or
# no terminal is connected.
# Target time between POLL commands in seconds when a device is attached or
# no device is attached.
#
# The connected poll period only applies in cases where the terminal responded
# with TR/TA to the last poll - this is an effort to improve the keystroke
# The attached poll period only applies in cases where the device responded
# with TT/AR to the last poll - this is an effort to improve the keystroke
# responsiveness.
self.connected_poll_period = 1 / 10
self.disconnected_poll_period = 5
self.attached_poll_period = 1 / 10
self.detatached_poll_period = 5
self.last_poll_time = None
self.last_poll_response = None
@@ -58,82 +56,54 @@ class Controller:
self.session_selector = None
if self.terminal:
self.terminal = None
if self.device:
self.device = None
def stop(self):
"""Stop the controller."""
self.running = False
def _run_loop(self):
device_address = None
poll_delay = self._calculate_poll_delay(time.perf_counter())
# If POLLing is delayed, handle the host output, otherwise just sleep.
if poll_delay > 0:
if self.session:
try:
self._update_session(poll_delay)
except SessionDisconnectedError:
self._handle_session_disconnected()
self._update_session(poll_delay)
else:
time.sleep(poll_delay)
# POLL devices.
self._poll_attached_device()
self._poll_detatched_device()
def _update_session(self, duration):
try:
poll_response = self._poll(device_address)
except ReceiveTimeout:
if self.terminal:
self._handle_terminal_detached()
update_count = 0
return
except ReceiveError as error:
self.logger.warning(f'POLL receive error: {error}', exc_info=error)
return
except ProtocolError as error:
self.logger.warning(f'POLL protocol error: {error}', exc_info=error)
return
while duration > 0:
start_time = time.perf_counter()
if not self.terminal:
try:
self._handle_terminal_attached(device_address, poll_response)
except UnsupportedTerminalError as error:
self.logger.error(f'Unsupported terminal: {error}')
return
selected = self.session_selector.select(duration)
if poll_response:
self._handle_poll_response(poll_response)
if not selected:
break
def _handle_terminal_attached(self, device_address, poll_response):
self.logger.info('Terminal attached')
for (key, _) in selected:
session = key.fileobj
self.terminal = create_terminal(self.interface, device_address, poll_response,
self.get_keymap)
if session.handle_host():
update_count += 1
self.terminal.setup()
duration -= (time.perf_counter() - start_time)
# Show the attached indicator on the status line.
self.terminal.display.status_line.write_string(0, 'S')
# Start the session.
self._start_session()
def _handle_terminal_detached(self):
self.logger.info('Terminal detached')
self._terminate_session()
self.terminal = None
def _handle_session_disconnected(self):
self.logger.info('Session disconnected')
self._terminate_session()
# Restart the session.
self._start_session()
if update_count > 0:
self.session.render()
except SessionDisconnectedError:
self._handle_session_disconnected()
def _start_session(self):
self.session = self.create_session(self.terminal)
self.session = self.create_session(self.device)
self.session.start()
@@ -149,36 +119,90 @@ class Controller:
self.session = None
def _update_session(self, duration):
update_count = 0
def _handle_session_disconnected(self):
self.logger.info('Session disconnected')
while duration > 0:
start_time = time.perf_counter()
self._terminate_session()
selected = self.session_selector.select(duration)
# Restart the session.
self._start_session()
if not selected:
break
def _poll_attached_device(self):
if not self.device:
return
for (key, events) in selected:
session = key.fileobj
self.last_poll_time = time.perf_counter()
if session.handle_host():
update_count += 1
try:
poll_response = self.device.poll()
except ReceiveTimeout:
self._handle_device_lost()
return
duration -= (time.perf_counter() - start_time)
if poll_response:
self._poll_ack(self.device.device_address)
if update_count > 0:
self.session.render()
self._handle_poll_response(poll_response)
self.last_poll_response = poll_response
def _poll_detatched_device(self):
if self.device:
return
self.last_poll_time = time.perf_counter()
device_address = None
try:
poll_response = self._poll(device_address)
except ReceiveTimeout:
return
if poll_response:
self._poll_ack(device_address)
self._handle_device_found(device_address, poll_response)
self.last_poll_response = poll_response
def _handle_device_found(self, device_address, poll_response):
self.logger.info(f'Found device @ {format_address(self.interface, device_address)}')
try:
device = self.create_device(self.interface, device_address, poll_response)
except UnsupportedDeviceError as error:
self.logger.error(f'Unsupported device @ {format_address(self.interface, device_address)}: {error}')
return
device.setup()
self.device = device
self.logger.info(f'Attached device @ {format_address(self.interface, device_address)}')
self._start_session()
def _handle_device_lost(self):
device_address = self.device.device_address
self.logger.info(f'Lost device @ {format_address(self.interface, device_address)}')
self._terminate_session()
self.device = None
self.logger.info(f'Detached device @ {format_address(self.interface, device_address)}')
def _handle_poll_response(self, poll_response):
if isinstance(poll_response, KeystrokePollResponse):
self._handle_keystroke_poll_response(poll_response)
def _handle_keystroke_poll_response(self, poll_response):
terminal = self.device
scan_code = poll_response.scan_code
(key, modifiers, modifiers_changed) = self.terminal.keyboard.get_key(scan_code)
(key, modifiers, modifiers_changed) = terminal.keyboard.get_key(scan_code)
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug((f'Keystroke detected: Scan Code = {scan_code}, '
@@ -186,41 +210,27 @@ class Controller:
# Update the status line if modifiers have changed.
if modifiers_changed:
self.terminal.display.status_line.write_keyboard_modifiers(modifiers)
terminal.display.status_line.write_keyboard_modifiers(modifiers)
if not key:
return
if key == Key.CURSOR_BLINK:
self.terminal.display.toggle_cursor_blink()
terminal.display.toggle_cursor_blink()
elif key == Key.ALT_CURSOR:
self.terminal.display.toggle_cursor_reverse()
terminal.display.toggle_cursor_reverse()
elif key == Key.CLICKER:
self.terminal.keyboard.toggle_clicker()
terminal.keyboard.toggle_clicker()
elif self.session:
self.session.handle_key(key, modifiers, scan_code)
self.session.render()
def _poll(self, device_address):
self.last_poll_time = time.perf_counter()
return self.interface.execute(address_commands(device_address, Poll()))
# If a terminal is connected, use the terminal method to ensure that
# any queued POLL action is applied.
if self.terminal:
poll_response = self.terminal.poll()
else:
poll_response = self.interface.execute(address_commands(device_address, Poll()))
if poll_response:
try:
self.interface.execute(address_commands(device_address, PollAck()))
except ProtocolError as error:
self.logger.warning(f'POLL_ACK protocol error: {error}', exc_info=error)
self.last_poll_response = poll_response
return poll_response
def _poll_ack(self, device_address):
self.interface.execute(address_commands(device_address, PollAck()))
def _calculate_poll_delay(self, current_time):
if self.last_poll_response is not None:
@@ -229,9 +239,9 @@ class Controller:
if self.last_poll_time is None:
return 0
if self.terminal:
period = self.connected_poll_period
if self.device:
period = self.attached_poll_period
else:
period = self.disconnected_poll_period
period = self.detatached_poll_period
return max((self.last_poll_time + period) - current_time, 0)

151
oec/device.py Normal file
View File

@@ -0,0 +1,151 @@
"""
oec.device
~~~~~~~~~~
"""
import os
import time
import logging
from more_itertools import chunked
from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \
Feature, ProtocolError
logger = logging.getLogger(__name__)
class Device:
"""A device."""
def __init__(self, interface, device_address):
self.interface = interface
self.device_address = device_address
def setup(self):
"""Setup the device."""
raise NotImplementedError
def poll(self):
"""POLL the device."""
raise NotImplementedError
def execute(self, commands):
"""Execute one or more commands."""
return self.interface.execute(address_commands(self.device_address, commands))
def execute_jumbo_write(self, data, create_first, create_subsequent, first_chunk_max_length_adjustment=-1):
"""Execute a jumbo write command that can be split."""
max_length = None
# The 3299 multiplexer appears to have some frame length limit, after which it will
# stop transmitting. I've not determined the actual limit, but 1024 appears to work.
if self.device_address is not None:
max_length = 1024
elif self.interface.jumbo_write_strategy == 'split':
max_length = self.interface.jumbo_write_max_length
chunks = _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment)
commands = [create_first(chunks[0])]
for chunk in chunks[1:]:
commands.append(create_subsequent(chunk))
if len(commands) > 1 and logger.isEnabledFor(logging.DEBUG):
logger.debug(f'Jumbo write split into {len(commands)}')
return self.execute(commands)
class UnsupportedDeviceError(Exception):
"""Unsupported device."""
def address_commands(device_address, commands):
"""Add device address to commands."""
if isinstance(commands, list):
return [(device_address, command) for command in commands]
return (device_address, commands)
def format_address(interface, device_address):
"""Format a device address."""
if device_address is None:
return interface.identifier
raise NotImplementedError
def get_ids(interface, device_address, extended_id_retry_attempts=3):
terminal_id = None
extended_id = None
try:
terminal_id = interface.execute(address_commands(device_address, ReadTerminalId()))
except ProtocolError as error:
logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error)
# Retry the READ_EXTENDED_ID command as it appears to fail frequently on the
# first request - unlike the READ_TERMINAL_ID command,
extended_id = None
for attempt in range(extended_id_retry_attempts):
try:
extended_id = interface.execute(address_commands(device_address, ReadExtendedId()))
break
except ProtocolError as error:
logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error)
time.sleep(0.25)
return (terminal_id, extended_id.hex() if extended_id is not None else None)
def get_features(interface, device_address):
commands = read_feature_ids()
ids = interface.execute([address_commands(device_address, command) for command in commands])
features = parse_features(ids, commands)
# Add override features - for example, this can be used to add an unreported
# EAB feature to a IBM 3179 terminal.
if 'COAX_FEATURES' in os.environ:
for override in os.environ['COAX_FEATURES'].split(','):
if '@' not in override:
logger.warning(f'Invalid feature override: {override}')
continue
(name, address) = override.split('@')
try:
feature = Feature[name]
except KeyError:
logger.warning(f'Invalid feature override: {override}')
continue
try:
address = int(address)
except ValueError:
logger.warning(f'Invalid feature override: {override}')
continue
logger.info(f'Adding override feature {feature} @ {address}')
features[feature] = address
return features
def _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment=-1):
if max_length is None:
return [data]
if isinstance(data, tuple):
length = len(data[0]) * data[1]
else:
length = len(data)
first_chunk_max_length = max_length + first_chunk_max_length_adjustment
if length <= first_chunk_max_length:
return [data]
if isinstance(data, tuple):
data = data[0] * data[1]
return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], max_length)]

View File

@@ -32,6 +32,9 @@ class InterfaceWrapper:
_print_i1_jumbo_write_notice(self.jumbo_write_max_length)
def __getattr__(self, attr):
if attr == 'identifier':
return self.interface.serial.port
return getattr(self.interface, attr)
def execute(self, commands):
@@ -47,12 +50,6 @@ class InterfaceWrapper:
return responses
def address_commands(device_address, commands):
if isinstance(commands, list):
return [(device_address, command) for command in commands]
return (device_address, commands)
def _get_jumbo_write_strategy():
value = os.environ.get('COAX_JUMBO')

View File

@@ -3,20 +3,12 @@ oec.terminal
~~~~~~~~~~~~
"""
import os
import time
import logging
from more_itertools import chunked
from coax import read_feature_ids, parse_features, Poll, ReadTerminalId, ReadExtendedId, \
LoadControlRegister, TerminalType, Feature, PollAction, Control, \
ProtocolError
from coax import Poll, LoadControlRegister, Feature, PollAction, Control
from .interface import address_commands
from .device import Device, UnsupportedDeviceError
from .display import Dimensions, BufferedDisplay
from .keyboard import Keyboard
logger = logging.getLogger(__name__)
MODEL_DIMENSIONS = {
2: Dimensions(24, 80),
3: Dimensions(32, 80),
@@ -24,17 +16,21 @@ MODEL_DIMENSIONS = {
5: Dimensions(27, 132)
}
class Terminal:
class Terminal(Device):
"""The terminal."""
def __init__(self, interface, device_address, terminal_id, extended_id, dimensions,
features, keymap):
self.interface = interface
self.device_address = device_address
def __init__(self, interface, device_address, terminal_id, extended_id, features, keymap):
super().__init__(interface, device_address)
self.terminal_id = terminal_id
self.extended_id = extended_id
self.features = features
dimensions = MODEL_DIMENSIONS.get(terminal_id.model)
if not dimensions:
raise UnsupportedDeviceError(f'Terminal model {terminal_id.model} is not supported')
self.control = Control(step_inhibit=False, display_inhibit=False,
cursor_inhibit=False, cursor_reverse=False,
cursor_blink=False)
@@ -54,6 +50,9 @@ class Terminal:
self.display.clear(clear_status_line=True)
# Show the attached indicator on the status line.
self.display.status_line.write_string(0, 'OEC')
def poll(self):
"""Execute a POLL command with queued actions."""
poll_action = PollAction.NONE
@@ -86,137 +85,3 @@ class Terminal:
def load_control_register(self):
"""Execute a LOAD_CONTROL_REGISTER command."""
self.execute(LoadControlRegister(self.control))
def execute(self, commands):
return self.interface.execute(address_commands(self.device_address, commands))
def execute_jumbo_write(self, data, create_first, create_subsequent, first_chunk_max_length_adjustment=-1):
max_length = None
if self.interface.jumbo_write_strategy == 'split':
max_length = self.interface.jumbo_write_max_length
chunks = _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment)
commands = [create_first(chunks[0])]
for chunk in chunks[1:]:
commands.append(create_subsequent(chunk))
return self.execute(commands)
class UnsupportedTerminalError(Exception):
"""Unsupported terminal."""
def create_terminal(interface, device_address, poll_response, get_keymap):
"""Terminal factory."""
# Read the terminal identifiers.
(terminal_id, extended_id) = _read_terminal_ids(interface, device_address)
logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}')
if terminal_id.type != TerminalType.CUT:
raise UnsupportedTerminalError('Only CUT type terminals are supported')
# Get the terminal dimensions.
dimensions = MODEL_DIMENSIONS.get(terminal_id.model)
if dimensions is None:
raise UnsupportedTerminalError(f'Model {terminal_id.model} is not supported')
logger.info(f'Rows = {dimensions.rows}, Columns = {dimensions.columns}')
# Get the terminal features.
features = _get_features(interface, device_address)
logger.info(f'Features = {features}')
# Get the keymap.
keymap = get_keymap(terminal_id, extended_id)
logger.info(f'Keymap = {keymap.name}')
# Create the terminal.
terminal = Terminal(interface, device_address, terminal_id, extended_id, dimensions,
features, keymap)
return terminal
def _read_terminal_ids(interface, device_address, extended_id_retry_attempts=3):
terminal_id = None
extended_id = None
try:
terminal_id = interface.execute(address_commands(device_address, ReadTerminalId()))
except ProtocolError as error:
logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error)
# Retry the READ_EXTENDED_ID command as it appears to fail frequently on the
# first request - unlike the READ_TERMINAL_ID command,
extended_id = None
for attempt in range(extended_id_retry_attempts):
try:
extended_id = interface.execute(address_commands(device_address, ReadExtendedId()))
break
except ProtocolError as error:
logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error)
time.sleep(0.25)
return (terminal_id, extended_id.hex() if extended_id is not None else None)
def _get_features(interface, device_address):
commands = read_feature_ids()
ids = interface.execute([address_commands(device_address, command) for command in commands])
features = parse_features(ids, commands)
# Add override features - for example, this can be used to add an unreported
# EAB feature to a IBM 3179 terminal.
if 'COAX_FEATURES' in os.environ:
for override in os.environ['COAX_FEATURES'].split(','):
if '@' not in override:
logger.warning(f'Invalid feature override: {override}')
continue
(name, address) = override.split('@')
try:
feature = Feature[name]
except KeyError:
logger.warning(f'Invalid feature override: {override}')
continue
try:
address = int(address)
except ValueError:
logger.warning(f'Invalid feature override: {override}')
continue
logger.info(f'Adding override feature {feature} @ {address}')
features[feature] = address
return features
def _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment=-1):
if max_length is None:
return [data]
if isinstance(data, tuple):
length = len(data[0]) * data[1]
else:
length = len(data)
first_chunk_max_length = max_length + first_chunk_max_length_adjustment
if length <= first_chunk_max_length:
return [data]
if isinstance(data, tuple):
data = data[0] * data[1]
return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], max_length)]

View File

@@ -7,6 +7,8 @@ class MockInterface(Interface):
def __init__(self, responses=[]):
self.mock_responses = responses
self.serial = Mock(port='/dev/mock')
self.legacy_firmware_detected = None
self.legacy_firmware_version = None

View File

@@ -4,15 +4,15 @@ from unittest.mock import Mock, create_autospec, patch
import selectors
from selectors import BaseSelector
from logging import Logger
from coax import Poll, PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout
from coax import Poll, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout
from coax.protocol import TerminalId
import context
from oec.interface import InterfaceWrapper
from oec.controller import Controller
from oec.terminal import Terminal, UnsupportedTerminalError
from oec.display import Dimensions
from oec.device import UnsupportedDeviceError
from oec.terminal import Terminal
from oec.keyboard import KeyboardModifiers, Key
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from oec.session import Session, SessionDisconnectedError
@@ -23,22 +23,9 @@ class RunLoopTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
self.session = create_autospec(Session, instance=True)
self.create_session = Mock(return_value=self.session)
self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', { }, KEYMAP_3278_2)
self.controller = Controller(InterfaceWrapper(self.interface), lambda terminal_id, extended_id: KEYMAP_3278_2, self.create_session)
self.controller.logger = create_autospec(Logger, instance=True)
self.controller.connected_poll_period = 1
self.controller.session_selector = create_autospec(BaseSelector, instance=True)
self.controller._update_session = Mock()
self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2)
self.terminal.setup = Mock()
self.terminal.setup = Mock(Terminal, instance=True)
self.terminal.display.write = Mock()
self.terminal.display.toggle_cursor_blink = Mock()
@@ -46,6 +33,21 @@ class RunLoopTestCase(unittest.TestCase):
self.terminal.keyboard.toggle_clicker = Mock()
self.create_device = Mock(return_value=self.terminal)
self.session = create_autospec(Session, instance=True)
self.create_session = Mock(return_value=self.session)
self.controller = Controller(InterfaceWrapper(self.interface), self.create_device, self.create_session)
self.controller.logger = create_autospec(Logger, instance=True)
self.controller.attached_poll_period = 1
self.controller.session_selector = create_autospec(BaseSelector, instance=True)
self.controller._update_session = Mock(wraps=self.controller._update_session)
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter = patcher.start()
@@ -54,37 +56,31 @@ class RunLoopTestCase(unittest.TestCase):
self.sleep = patcher.start()
patcher = patch('oec.controller.create_terminal')
self.create_terminal = patcher.start()
self.create_terminal.return_value = self.terminal
self.addCleanup(patch.stopall)
def test_no_terminal(self):
def test_no_device(self):
self._assert_run_loop(0, ReceiveTimeout, False, 0, False)
self._assert_run_loop(1, ReceiveTimeout, False, 4, False)
self.assertIsNone(self.controller.terminal)
self.assertIsNone(self.controller.device)
self.assertIsNone(self.controller.session)
def test_terminal_attached(self):
def test_device_attached(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, None, True, 0.5, False)
self.assertIsNotNone(self.controller.terminal)
self.assertIsNotNone(self.controller.device)
self.assertIsNotNone(self.controller.session)
self.controller._update_session.assert_called()
def test_unsupported_terminal_attached(self):
self.create_terminal.side_effect = [UnsupportedTerminalError]
def test_unsupported_device_attached(self):
self.create_device.side_effect = [UnsupportedDeviceError]
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self.assertIsNone(self.controller.terminal)
self.assertIsNone(self.controller.device)
self.assertIsNone(self.controller.session)
def test_keystroke(self):
@@ -93,30 +89,34 @@ class RunLoopTestCase(unittest.TestCase):
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, None, True, 0.5, False)
self.assertIsNotNone(self.controller.terminal)
self.assertIsNotNone(self.controller.device)
self.assertIsNotNone(self.controller.session)
self.controller.session.handle_key.assert_called_with(Key.LOWER_A, KeyboardModifiers.NONE, 96)
def test_terminal_detached(self):
def test_device_detatched(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, ReceiveTimeout, True, 0.5, False)
self.assertIsNone(self.controller.terminal)
self.assertIsNone(self.controller.device)
self.assertIsNone(self.controller.session)
self.session.terminate.assert_called()
def test_session_disconnected(self):
self.controller._update_session.side_effect = [None, SessionDisconnectedError, None]
selector_key = Mock(fileobj=self.session)
self.controller.session_selector.select.return_value = [(selector_key, selectors.EVENT_READ)]
self.session.handle_host = Mock(side_effect=[None, SessionDisconnectedError])
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, None, True, 0.5, False)
self._assert_run_loop(1.5, None, True, 0.5, False)
self._assert_run_loop(1.5, None, True, 3, False)
self.assertIsNotNone(self.controller.terminal)
self.assertIsNotNone(self.controller.device)
self.assertIsNotNone(self.controller.session)
self.assertEqual(self.create_session.call_count, 2)
@@ -174,7 +174,16 @@ class RunLoopTestCase(unittest.TestCase):
self.interface.reset_mock()
self.perf_counter.side_effect = [poll_time, poll_time + expected_poll_delay]
def perf_counter():
nonlocal poll_time
time = poll_time
poll_time += expected_update_session
return time
self.perf_counter.side_effect = perf_counter
self.sleep.reset_mock()

189
tests/test_device.py Normal file
View File

@@ -0,0 +1,189 @@
import unittest
from unittest.mock import Mock, patch
from logging import Logger
from coax import TerminalType, Feature, ReadAddressCounterHi, ReadAddressCounterLo, ReadTerminalId, ReadExtendedId, ReadFeatureId, ProtocolError
from coax.protocol import TerminalId
import context
from oec.interface import InterfaceWrapper
from oec.device import address_commands, format_address, get_ids, get_features, _jumbo_write_split_data
from mock_interface import MockInterface
class AddressCommandsTestCase(unittest.TestCase):
def test_single_command(self):
# Arrange
command = ReadAddressCounterHi()
# Act
result = address_commands(0b111000, command)
# Assert
self.assertEqual(result, (0b111000, command))
def test_multiple_commands(self):
# Arrange
commands = [ReadAddressCounterHi(), ReadAddressCounterLo()]
# Act
result = address_commands(0b111000, commands)
# Assert
self.assertEqual(result, [(0b111000, commands[0]), (0b111000, commands[1])])
class FormatAddressTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
def test_no_port(self):
self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock')
def test_multiplexer(self):
with self.assertRaises(NotImplementedError):
format_address(InterfaceWrapper(self.interface), 0b110000)
class GetIdsTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
patcher = patch('oec.device.logger', autospec=Logger)
self.logger = patcher.start()
patcher = patch('oec.device.time.sleep')
self.sleep = patcher.start()
self.addCleanup(patch.stopall)
def test_no_extended_id(self):
# Arrange
self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b11110100))]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertIsNone(extended_id)
def test_extended_id(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, bytes.fromhex('01 02 03 04'))
]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertEqual(extended_id, '01020304')
def test_extended_id_second_attempt(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, Mock(side_effect=[ProtocolError, bytes.fromhex('01 02 03 04')]))
]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertEqual(extended_id, '01020304')
self.logger.warning.assert_called()
def test_extended_id_failed(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, Mock(side_effect=ProtocolError))
]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertIsNone(extended_id)
self.logger.warning.assert_called()
class GetFeaturesTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
def test_no_features(self):
# Act
features = get_features(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(features, { })
def test_eab_feature(self):
# Arrange
self.interface.mock_responses = [(None, ReadFeatureId, lambda command: command.feature_address == 7, Feature.EAB.value)]
# Act
features = get_features(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(features, { Feature.EAB: 7 })
def test_override(self):
# Act
with patch.dict('oec.device.os.environ', { 'COAX_FEATURES': 'EAB@7' }):
features = get_features(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(features, { Feature.EAB: 7 })
class JumboWriteSplitDataTestCase(unittest.TestCase):
def test_no_split_strategy(self):
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, None)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_one_chunk(self):
for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_two_chunks(self):
for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 2)
self.assertEqual(len(result[0]), 31)
def test_split_strategy_three_chunks(self):
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 3)
self.assertEqual(len(result[0]), 31)
self.assertEqual(len(result[1]), 32)

View File

@@ -842,25 +842,23 @@ class EncodeStringTestCase(unittest.TestCase):
def _create_display(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap)
display = Display(terminal, dimensions, features.get(Feature.EAB))
display = Display(terminal, terminal.display.dimensions, features.get(Feature.EAB))
return display
def _create_buffered_display(interface, has_eab=False):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { Feature.EAB: 7 } if has_eab else { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap)
buffered_display = BufferedDisplay(terminal, dimensions, features.get(Feature.EAB))
buffered_display = BufferedDisplay(terminal, terminal.display.dimensions, features.get(Feature.EAB))
return buffered_display

View File

@@ -5,7 +5,7 @@ from coax import ReadAddressCounterHi, ReadAddressCounterLo, ProtocolError
import context
from oec.interface import InterfaceWrapper, AggregateExecuteError, address_commands
from oec.interface import InterfaceWrapper, AggregateExecuteError
from mock_interface import MockInterface
@@ -105,24 +105,3 @@ class InterfaceWrapperExecuteTestCase(unittest.TestCase):
self.assertIsInstance(error.errors[0], ProtocolError)
self.assertEqual(len(error.responses), 2)
class AddressCommandsTestCase(unittest.TestCase):
def test_single_command(self):
# Arrange
command = ReadAddressCounterHi()
# Act
result = address_commands(0b111000, command)
# Assert
self.assertEqual(result, (0b111000, command))
def test_multiple_commands(self):
# Arrange
commands = [ReadAddressCounterHi(), ReadAddressCounterLo()]
# Act
result = address_commands(0b111000, commands)
# Assert
self.assertEqual(result, [(0b111000, commands[0]), (0b111000, commands[1])])

View File

@@ -1,17 +1,36 @@
import unittest
from unittest.mock import Mock, create_autospec, patch
from coax import Poll, PollAction, TerminalType, Feature, ReadTerminalId, ReadExtendedId, ReadFeatureId
from unittest.mock import create_autospec
from coax import Poll, PollAction
from coax.protocol import TerminalId
import context
from oec.interface import InterfaceWrapper
from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError, _jumbo_write_split_data
from oec.display import Display, Dimensions
from oec.device import UnsupportedDeviceError
from oec.terminal import Terminal
from oec.display import Display, StatusLine
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from mock_interface import MockInterface
class InitTerminalTestCase(unittest.TestCase):
def test_supported_terminal_model(self):
# Arrange
terminal_id = TerminalId(0b11110100)
# Act
Terminal(None, None, terminal_id, None, { }, KEYMAP_3278_2)
def test_unsupported_terminal_model(self):
# Arrange
terminal_id = TerminalId(0b11110100)
terminal_id.model = 1
# Act and assert
with self.assertRaises(UnsupportedDeviceError):
Terminal(None, None, terminal_id, None, { }, KEYMAP_3278_2)
class TerminalSetupTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
@@ -19,6 +38,7 @@ class TerminalSetupTestCase(unittest.TestCase):
self.terminal = _create_terminal(self.interface)
self.terminal.display = create_autospec(Display, instance=True)
self.terminal.display.status_line = create_autospec(StatusLine, instance=True)
def test(self):
self.terminal.setup()
@@ -70,143 +90,12 @@ class TerminalPollTestCase(unittest.TestCase):
def assert_poll_with_poll_action(self, action):
self.interface.assert_command_executed(None, Poll, lambda command: command.action == action)
class CreateTerminalTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
self.get_keymap = lambda terminal_id, extended_id: KEYMAP_3278_2
def test_supported_terminal_with_no_features(self):
# Arrange
interface = InterfaceWrapper(self.interface)
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, bytes.fromhex('00 00 00 00'))
]
# Act
terminal = create_terminal(interface, None, None, self.get_keymap)
# Assert
self.assertEqual(terminal.terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal.terminal_id.model, 2)
self.assertEqual(terminal.terminal_id.keyboard, 15)
self.assertEqual(terminal.extended_id, '00000000')
self.assertEqual(terminal.display.dimensions, Dimensions(24, 80))
self.assertEqual(terminal.features, { })
self.assertEqual(terminal.keyboard.keymap.name, '3278-2')
def test_supported_terminal_with_eab_feature(self):
# Arrange
interface = InterfaceWrapper(self.interface)
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, bytes.fromhex('c1 34 83 00')),
(None, ReadFeatureId, lambda command: command.feature_address == 7, Feature.EAB.value)
]
# Act
terminal = create_terminal(interface, None, None, self.get_keymap)
# Assert
self.assertEqual(terminal.terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal.terminal_id.model, 2)
self.assertEqual(terminal.terminal_id.keyboard, 15)
self.assertEqual(terminal.extended_id, 'c1348300')
self.assertEqual(terminal.display.dimensions, Dimensions(24, 80))
self.assertEqual(terminal.features, { Feature.EAB: 7 })
self.assertEqual(terminal.keyboard.keymap.name, '3278-2')
def test_supported_terminal_features_override(self):
# Arrange
interface = InterfaceWrapper(self.interface)
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, bytes.fromhex('00 00 00 00'))
]
# Act
with patch.dict('oec.terminal.os.environ', { 'COAX_FEATURES': 'EAB@7' }):
terminal = create_terminal(interface, None, None, self.get_keymap)
# Assert
self.assertEqual(terminal.terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal.terminal_id.model, 2)
self.assertEqual(terminal.terminal_id.keyboard, 15)
self.assertEqual(terminal.extended_id, '00000000')
self.assertEqual(terminal.display.dimensions, Dimensions(24, 80))
self.assertEqual(terminal.features, { Feature.EAB: 7 })
self.assertEqual(terminal.keyboard.keymap.name, '3278-2')
def test_unsupported_terminal_type(self):
# Arrange
interface = InterfaceWrapper(self.interface)
self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b00000001))]
# Act and assert
with self.assertRaises(UnsupportedTerminalError):
create_terminal(interface, None, None, self.get_keymap)
def test_unsupported_terminal_model(self):
# Arrange
interface = InterfaceWrapper(self.interface)
terminal_id = TerminalId(0b11110100)
terminal_id.model = 1
self.interface.mock_responses = [(None, ReadTerminalId, None, terminal_id)]
# Act and assert
with self.assertRaises(UnsupportedTerminalError):
create_terminal(interface, None, None, self.get_keymap)
class JumboWriteSplitDataTestCase(unittest.TestCase):
def test_no_split_strategy(self):
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, None)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_one_chunk(self):
for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_two_chunks(self):
for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 2)
self.assertEqual(len(result[0]), 31)
def test_split_strategy_three_chunks(self):
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 3)
self.assertEqual(len(result[0]), 31)
self.assertEqual(len(result[1]), 32)
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap)
return terminal

View File

@@ -370,11 +370,10 @@ class SessionRenderTestCase(unittest.TestCase):
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap)
terminal.display.status_line = create_autospec(StatusLine, instance=True)

View File

@@ -178,10 +178,9 @@ class SessionRenderTestCase(unittest.TestCase):
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap)
return terminal