From 8b5421cf3c0aa9e32dd98b887fbd87b6c72a1941 Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Thu, 11 Nov 2021 19:15:29 -0600 Subject: [PATCH] Refactor controller and add device --- oec/__main__.py | 70 ++++++++---- oec/controller.py | 224 ++++++++++++++++++++------------------- oec/device.py | 151 ++++++++++++++++++++++++++ oec/interface.py | 9 +- oec/terminal.py | 163 +++------------------------- tests/mock_interface.py | 2 + tests/test_controller.py | 85 ++++++++------- tests/test_device.py | 189 +++++++++++++++++++++++++++++++++ tests/test_display.py | 10 +- tests/test_interface.py | 23 +--- tests/test_terminal.py | 161 +++++----------------------- tests/test_tn3270.py | 3 +- tests/test_vt100.py | 3 +- 13 files changed, 604 insertions(+), 489 deletions(-) create mode 100644 oec/device.py create mode 100644 tests/test_device.py diff --git a/oec/__main__.py b/oec/__main__.py index 875172a..8c697e3 100644 --- a/oec/__main__.py +++ b/oec/__main__.py @@ -2,26 +2,30 @@ import os import signal import logging import argparse -from coax import open_serial_interface +from coax import open_serial_interface, TerminalType from .interface import InterfaceWrapper from .controller import Controller +from .device import get_ids, get_features, UnsupportedDeviceError +from .terminal import Terminal from .tn3270 import TN3270Session # VT100 emulation is not supported on Windows. -is_vt100_available = False +IS_VT100_AVAILABLE = False if os.name == 'posix': from .vt100 import VT100Session - is_vt100_available = True + IS_VT100_AVAILABLE = True from .keymap_3278_2 import KEYMAP as KEYMAP_3278_2 from .keymap_3483 import KEYMAP as KEYMAP_3483 logging.basicConfig(level=logging.INFO) -controller = None +logger = logging.getLogger('oec') + +CONTROLLER = None def _get_keymap(terminal_id, extended_id): keymap = KEYMAP_3278_2 @@ -34,35 +38,58 @@ def _get_keymap(terminal_id, extended_id): return keymap -def _create_session(args, terminal): - if args.emulator == 'tn3270': - return TN3270Session(terminal, args.host, args.port) +def _create_device(args, interface, device_address, poll_response): + # Read the terminal identifiers. + (terminal_id, extended_id) = get_ids(interface, device_address) - if args.emulator == 'vt100' and is_vt100_available: + logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}') + + if terminal_id.type != TerminalType.CUT: + raise UnsupportedDeviceError('Only CUT type terminals are supported') + + # Read the terminal features. + features = get_features(interface, device_address) + + logger.info(f'Features = {features}') + + # Get the keymap. + keymap = _get_keymap(terminal_id, extended_id) + + logger.info(f'Keymap = {keymap.name}') + + # Create the terminal. + terminal = Terminal(interface, device_address, terminal_id, extended_id, features, keymap) + + return terminal + +def _create_session(args, device): + if args.emulator == 'tn3270': + return TN3270Session(device, args.host, args.port) + + if args.emulator == 'vt100' and IS_VT100_AVAILABLE: host_command = [args.command, *args.command_args] - return VT100Session(terminal, host_command) + return VT100Session(device, host_command) raise ValueError('Unsupported emulator') def _signal_handler(number, frame): - global controller + global CONTROLLER print('Stopping controller...') - if controller: - controller.stop() + if CONTROLLER: + CONTROLLER.stop() - controller = None + CONTROLLER = None signal.signal(signal.SIGINT, _signal_handler) signal.signal(signal.SIGTERM, _signal_handler) def main(): - global controller + global CONTROLLER - parser = argparse.ArgumentParser(description=('An open replacement for the IBM 3174 ' - 'Establishment Controller')) + parser = argparse.ArgumentParser(description='IBM 3270 terminal controller') parser.add_argument('serial_port', help='Serial port') @@ -75,7 +102,7 @@ def main(): tn3270_parser.add_argument('host', help='Hostname') tn3270_parser.add_argument('port', nargs='?', default=23, type=int) - if is_vt100_available: + if IS_VT100_AVAILABLE: vt100_parser = subparsers.add_parser('vt100', description='VT100 emulator', help='VT100 emulator') @@ -85,14 +112,15 @@ def main(): args = parser.parse_args() - with open_serial_interface(args.serial_port) as interface: - create_session = lambda terminal: _create_session(args, terminal) + create_device = lambda interface, device_address, poll_response: _create_device(args, interface, device_address, poll_response) + create_session = lambda device: _create_session(args, device) - controller = Controller(InterfaceWrapper(interface), _get_keymap, create_session) + with open_serial_interface(args.serial_port) as interface: + CONTROLLER = Controller(InterfaceWrapper(interface), create_device, create_session) print('Starting controller...') - controller.run() + CONTROLLER.run() if __name__ == '__main__': main() diff --git a/oec/controller.py b/oec/controller.py index 2670814..6b92ed5 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -6,39 +6,37 @@ oec.controller import time import logging import selectors -from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout, \ - ReceiveError, ProtocolError +from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout -from .interface import address_commands -from .terminal import create_terminal, UnsupportedTerminalError +from .device import address_commands, format_address, UnsupportedDeviceError from .keyboard import Key from .session import SessionDisconnectedError class Controller: """The controller.""" - def __init__(self, interface, get_keymap, create_session): + def __init__(self, interface, create_device, create_session): self.logger = logging.getLogger(__name__) + self.interface = interface self.running = False - self.interface = interface - self.get_keymap = get_keymap + self.create_device = create_device self.create_session = create_session - self.terminal = None - self.session = None + self.device = None + self.session = None self.session_selector = None - # Target time between POLL commands in seconds when a terminal is connected or - # no terminal is connected. + # Target time between POLL commands in seconds when a device is attached or + # no device is attached. # - # The connected poll period only applies in cases where the terminal responded - # with TR/TA to the last poll - this is an effort to improve the keystroke + # The attached poll period only applies in cases where the device responded + # with TT/AR to the last poll - this is an effort to improve the keystroke # responsiveness. - self.connected_poll_period = 1 / 10 - self.disconnected_poll_period = 5 + self.attached_poll_period = 1 / 10 + self.detatached_poll_period = 5 self.last_poll_time = None self.last_poll_response = None @@ -58,82 +56,54 @@ class Controller: self.session_selector = None - if self.terminal: - self.terminal = None + if self.device: + self.device = None def stop(self): + """Stop the controller.""" self.running = False def _run_loop(self): - device_address = None - poll_delay = self._calculate_poll_delay(time.perf_counter()) # If POLLing is delayed, handle the host output, otherwise just sleep. if poll_delay > 0: if self.session: - try: - self._update_session(poll_delay) - except SessionDisconnectedError: - self._handle_session_disconnected() + self._update_session(poll_delay) else: time.sleep(poll_delay) + # POLL devices. + self._poll_attached_device() + self._poll_detatched_device() + + def _update_session(self, duration): try: - poll_response = self._poll(device_address) - except ReceiveTimeout: - if self.terminal: - self._handle_terminal_detached() + update_count = 0 - return - except ReceiveError as error: - self.logger.warning(f'POLL receive error: {error}', exc_info=error) - return - except ProtocolError as error: - self.logger.warning(f'POLL protocol error: {error}', exc_info=error) - return + while duration > 0: + start_time = time.perf_counter() - if not self.terminal: - try: - self._handle_terminal_attached(device_address, poll_response) - except UnsupportedTerminalError as error: - self.logger.error(f'Unsupported terminal: {error}') - return + selected = self.session_selector.select(duration) - if poll_response: - self._handle_poll_response(poll_response) + if not selected: + break - def _handle_terminal_attached(self, device_address, poll_response): - self.logger.info('Terminal attached') + for (key, _) in selected: + session = key.fileobj - self.terminal = create_terminal(self.interface, device_address, poll_response, - self.get_keymap) + if session.handle_host(): + update_count += 1 - self.terminal.setup() + duration -= (time.perf_counter() - start_time) - # Show the attached indicator on the status line. - self.terminal.display.status_line.write_string(0, 'S') - - # Start the session. - self._start_session() - - def _handle_terminal_detached(self): - self.logger.info('Terminal detached') - - self._terminate_session() - - self.terminal = None - - def _handle_session_disconnected(self): - self.logger.info('Session disconnected') - - self._terminate_session() - - # Restart the session. - self._start_session() + if update_count > 0: + self.session.render() + except SessionDisconnectedError: + self._handle_session_disconnected() def _start_session(self): - self.session = self.create_session(self.terminal) + self.session = self.create_session(self.device) self.session.start() @@ -149,36 +119,90 @@ class Controller: self.session = None - def _update_session(self, duration): - update_count = 0 + def _handle_session_disconnected(self): + self.logger.info('Session disconnected') - while duration > 0: - start_time = time.perf_counter() + self._terminate_session() - selected = self.session_selector.select(duration) + # Restart the session. + self._start_session() - if not selected: - break + def _poll_attached_device(self): + if not self.device: + return - for (key, events) in selected: - session = key.fileobj + self.last_poll_time = time.perf_counter() - if session.handle_host(): - update_count += 1 + try: + poll_response = self.device.poll() + except ReceiveTimeout: + self._handle_device_lost() + return - duration -= (time.perf_counter() - start_time) + if poll_response: + self._poll_ack(self.device.device_address) - if update_count > 0: - self.session.render() + self._handle_poll_response(poll_response) + + self.last_poll_response = poll_response + + def _poll_detatched_device(self): + if self.device: + return + + self.last_poll_time = time.perf_counter() + + device_address = None + + try: + poll_response = self._poll(device_address) + except ReceiveTimeout: + return + + if poll_response: + self._poll_ack(device_address) + + self._handle_device_found(device_address, poll_response) + + self.last_poll_response = poll_response + + def _handle_device_found(self, device_address, poll_response): + self.logger.info(f'Found device @ {format_address(self.interface, device_address)}') + + try: + device = self.create_device(self.interface, device_address, poll_response) + except UnsupportedDeviceError as error: + self.logger.error(f'Unsupported device @ {format_address(self.interface, device_address)}: {error}') + return + + device.setup() + + self.device = device + + self.logger.info(f'Attached device @ {format_address(self.interface, device_address)}') + + self._start_session() + + def _handle_device_lost(self): + device_address = self.device.device_address + + self.logger.info(f'Lost device @ {format_address(self.interface, device_address)}') + + self._terminate_session() + + self.device = None + + self.logger.info(f'Detached device @ {format_address(self.interface, device_address)}') def _handle_poll_response(self, poll_response): if isinstance(poll_response, KeystrokePollResponse): self._handle_keystroke_poll_response(poll_response) def _handle_keystroke_poll_response(self, poll_response): + terminal = self.device scan_code = poll_response.scan_code - (key, modifiers, modifiers_changed) = self.terminal.keyboard.get_key(scan_code) + (key, modifiers, modifiers_changed) = terminal.keyboard.get_key(scan_code) if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug((f'Keystroke detected: Scan Code = {scan_code}, ' @@ -186,41 +210,27 @@ class Controller: # Update the status line if modifiers have changed. if modifiers_changed: - self.terminal.display.status_line.write_keyboard_modifiers(modifiers) + terminal.display.status_line.write_keyboard_modifiers(modifiers) if not key: return if key == Key.CURSOR_BLINK: - self.terminal.display.toggle_cursor_blink() + terminal.display.toggle_cursor_blink() elif key == Key.ALT_CURSOR: - self.terminal.display.toggle_cursor_reverse() + terminal.display.toggle_cursor_reverse() elif key == Key.CLICKER: - self.terminal.keyboard.toggle_clicker() + terminal.keyboard.toggle_clicker() elif self.session: self.session.handle_key(key, modifiers, scan_code) self.session.render() def _poll(self, device_address): - self.last_poll_time = time.perf_counter() + return self.interface.execute(address_commands(device_address, Poll())) - # If a terminal is connected, use the terminal method to ensure that - # any queued POLL action is applied. - if self.terminal: - poll_response = self.terminal.poll() - else: - poll_response = self.interface.execute(address_commands(device_address, Poll())) - - if poll_response: - try: - self.interface.execute(address_commands(device_address, PollAck())) - except ProtocolError as error: - self.logger.warning(f'POLL_ACK protocol error: {error}', exc_info=error) - - self.last_poll_response = poll_response - - return poll_response + def _poll_ack(self, device_address): + self.interface.execute(address_commands(device_address, PollAck())) def _calculate_poll_delay(self, current_time): if self.last_poll_response is not None: @@ -229,9 +239,9 @@ class Controller: if self.last_poll_time is None: return 0 - if self.terminal: - period = self.connected_poll_period + if self.device: + period = self.attached_poll_period else: - period = self.disconnected_poll_period + period = self.detatached_poll_period return max((self.last_poll_time + period) - current_time, 0) diff --git a/oec/device.py b/oec/device.py new file mode 100644 index 0000000..cd3a437 --- /dev/null +++ b/oec/device.py @@ -0,0 +1,151 @@ +""" +oec.device +~~~~~~~~~~ +""" + +import os +import time +import logging +from more_itertools import chunked +from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \ + Feature, ProtocolError + +logger = logging.getLogger(__name__) + +class Device: + """A device.""" + + def __init__(self, interface, device_address): + self.interface = interface + self.device_address = device_address + + def setup(self): + """Setup the device.""" + raise NotImplementedError + + def poll(self): + """POLL the device.""" + raise NotImplementedError + + def execute(self, commands): + """Execute one or more commands.""" + return self.interface.execute(address_commands(self.device_address, commands)) + + def execute_jumbo_write(self, data, create_first, create_subsequent, first_chunk_max_length_adjustment=-1): + """Execute a jumbo write command that can be split.""" + max_length = None + + # The 3299 multiplexer appears to have some frame length limit, after which it will + # stop transmitting. I've not determined the actual limit, but 1024 appears to work. + if self.device_address is not None: + max_length = 1024 + elif self.interface.jumbo_write_strategy == 'split': + max_length = self.interface.jumbo_write_max_length + + chunks = _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment) + + commands = [create_first(chunks[0])] + + for chunk in chunks[1:]: + commands.append(create_subsequent(chunk)) + + if len(commands) > 1 and logger.isEnabledFor(logging.DEBUG): + logger.debug(f'Jumbo write split into {len(commands)}') + + return self.execute(commands) + +class UnsupportedDeviceError(Exception): + """Unsupported device.""" + +def address_commands(device_address, commands): + """Add device address to commands.""" + if isinstance(commands, list): + return [(device_address, command) for command in commands] + + return (device_address, commands) + +def format_address(interface, device_address): + """Format a device address.""" + if device_address is None: + return interface.identifier + + raise NotImplementedError + +def get_ids(interface, device_address, extended_id_retry_attempts=3): + terminal_id = None + extended_id = None + + try: + terminal_id = interface.execute(address_commands(device_address, ReadTerminalId())) + except ProtocolError as error: + logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error) + + # Retry the READ_EXTENDED_ID command as it appears to fail frequently on the + # first request - unlike the READ_TERMINAL_ID command, + extended_id = None + + for attempt in range(extended_id_retry_attempts): + try: + extended_id = interface.execute(address_commands(device_address, ReadExtendedId())) + + break + except ProtocolError as error: + logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error) + + time.sleep(0.25) + + return (terminal_id, extended_id.hex() if extended_id is not None else None) + +def get_features(interface, device_address): + commands = read_feature_ids() + + ids = interface.execute([address_commands(device_address, command) for command in commands]) + + features = parse_features(ids, commands) + + # Add override features - for example, this can be used to add an unreported + # EAB feature to a IBM 3179 terminal. + if 'COAX_FEATURES' in os.environ: + for override in os.environ['COAX_FEATURES'].split(','): + if '@' not in override: + logger.warning(f'Invalid feature override: {override}') + continue + + (name, address) = override.split('@') + + try: + feature = Feature[name] + except KeyError: + logger.warning(f'Invalid feature override: {override}') + continue + + try: + address = int(address) + except ValueError: + logger.warning(f'Invalid feature override: {override}') + continue + + logger.info(f'Adding override feature {feature} @ {address}') + + features[feature] = address + + return features + +def _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment=-1): + if max_length is None: + return [data] + + if isinstance(data, tuple): + length = len(data[0]) * data[1] + else: + length = len(data) + + first_chunk_max_length = max_length + first_chunk_max_length_adjustment + + if length <= first_chunk_max_length: + return [data] + + if isinstance(data, tuple): + data = data[0] * data[1] + + return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], max_length)] diff --git a/oec/interface.py b/oec/interface.py index e6fa5c8..ffdbe36 100644 --- a/oec/interface.py +++ b/oec/interface.py @@ -32,6 +32,9 @@ class InterfaceWrapper: _print_i1_jumbo_write_notice(self.jumbo_write_max_length) def __getattr__(self, attr): + if attr == 'identifier': + return self.interface.serial.port + return getattr(self.interface, attr) def execute(self, commands): @@ -47,12 +50,6 @@ class InterfaceWrapper: return responses -def address_commands(device_address, commands): - if isinstance(commands, list): - return [(device_address, command) for command in commands] - - return (device_address, commands) - def _get_jumbo_write_strategy(): value = os.environ.get('COAX_JUMBO') diff --git a/oec/terminal.py b/oec/terminal.py index a4f58aa..1928944 100644 --- a/oec/terminal.py +++ b/oec/terminal.py @@ -3,20 +3,12 @@ oec.terminal ~~~~~~~~~~~~ """ -import os -import time -import logging -from more_itertools import chunked -from coax import read_feature_ids, parse_features, Poll, ReadTerminalId, ReadExtendedId, \ - LoadControlRegister, TerminalType, Feature, PollAction, Control, \ - ProtocolError +from coax import Poll, LoadControlRegister, Feature, PollAction, Control -from .interface import address_commands +from .device import Device, UnsupportedDeviceError from .display import Dimensions, BufferedDisplay from .keyboard import Keyboard -logger = logging.getLogger(__name__) - MODEL_DIMENSIONS = { 2: Dimensions(24, 80), 3: Dimensions(32, 80), @@ -24,17 +16,21 @@ MODEL_DIMENSIONS = { 5: Dimensions(27, 132) } -class Terminal: +class Terminal(Device): """The terminal.""" - def __init__(self, interface, device_address, terminal_id, extended_id, dimensions, - features, keymap): - self.interface = interface - self.device_address = device_address + def __init__(self, interface, device_address, terminal_id, extended_id, features, keymap): + super().__init__(interface, device_address) + self.terminal_id = terminal_id self.extended_id = extended_id self.features = features + dimensions = MODEL_DIMENSIONS.get(terminal_id.model) + + if not dimensions: + raise UnsupportedDeviceError(f'Terminal model {terminal_id.model} is not supported') + self.control = Control(step_inhibit=False, display_inhibit=False, cursor_inhibit=False, cursor_reverse=False, cursor_blink=False) @@ -54,6 +50,9 @@ class Terminal: self.display.clear(clear_status_line=True) + # Show the attached indicator on the status line. + self.display.status_line.write_string(0, 'OEC') + def poll(self): """Execute a POLL command with queued actions.""" poll_action = PollAction.NONE @@ -86,137 +85,3 @@ class Terminal: def load_control_register(self): """Execute a LOAD_CONTROL_REGISTER command.""" self.execute(LoadControlRegister(self.control)) - - def execute(self, commands): - return self.interface.execute(address_commands(self.device_address, commands)) - - def execute_jumbo_write(self, data, create_first, create_subsequent, first_chunk_max_length_adjustment=-1): - max_length = None - - if self.interface.jumbo_write_strategy == 'split': - max_length = self.interface.jumbo_write_max_length - - chunks = _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment) - - commands = [create_first(chunks[0])] - - for chunk in chunks[1:]: - commands.append(create_subsequent(chunk)) - - return self.execute(commands) - -class UnsupportedTerminalError(Exception): - """Unsupported terminal.""" - -def create_terminal(interface, device_address, poll_response, get_keymap): - """Terminal factory.""" - # Read the terminal identifiers. - (terminal_id, extended_id) = _read_terminal_ids(interface, device_address) - - logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}') - - if terminal_id.type != TerminalType.CUT: - raise UnsupportedTerminalError('Only CUT type terminals are supported') - - # Get the terminal dimensions. - dimensions = MODEL_DIMENSIONS.get(terminal_id.model) - - if dimensions is None: - raise UnsupportedTerminalError(f'Model {terminal_id.model} is not supported') - - logger.info(f'Rows = {dimensions.rows}, Columns = {dimensions.columns}') - - # Get the terminal features. - features = _get_features(interface, device_address) - - logger.info(f'Features = {features}') - - # Get the keymap. - keymap = get_keymap(terminal_id, extended_id) - - logger.info(f'Keymap = {keymap.name}') - - # Create the terminal. - terminal = Terminal(interface, device_address, terminal_id, extended_id, dimensions, - features, keymap) - - return terminal - -def _read_terminal_ids(interface, device_address, extended_id_retry_attempts=3): - terminal_id = None - extended_id = None - - try: - terminal_id = interface.execute(address_commands(device_address, ReadTerminalId())) - except ProtocolError as error: - logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error) - - # Retry the READ_EXTENDED_ID command as it appears to fail frequently on the - # first request - unlike the READ_TERMINAL_ID command, - extended_id = None - - for attempt in range(extended_id_retry_attempts): - try: - extended_id = interface.execute(address_commands(device_address, ReadExtendedId())) - - break - except ProtocolError as error: - logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error) - - time.sleep(0.25) - - return (terminal_id, extended_id.hex() if extended_id is not None else None) - -def _get_features(interface, device_address): - commands = read_feature_ids() - - ids = interface.execute([address_commands(device_address, command) for command in commands]) - - features = parse_features(ids, commands) - - # Add override features - for example, this can be used to add an unreported - # EAB feature to a IBM 3179 terminal. - if 'COAX_FEATURES' in os.environ: - for override in os.environ['COAX_FEATURES'].split(','): - if '@' not in override: - logger.warning(f'Invalid feature override: {override}') - continue - - (name, address) = override.split('@') - - try: - feature = Feature[name] - except KeyError: - logger.warning(f'Invalid feature override: {override}') - continue - - try: - address = int(address) - except ValueError: - logger.warning(f'Invalid feature override: {override}') - continue - - logger.info(f'Adding override feature {feature} @ {address}') - - features[feature] = address - - return features - -def _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment=-1): - if max_length is None: - return [data] - - if isinstance(data, tuple): - length = len(data[0]) * data[1] - else: - length = len(data) - - first_chunk_max_length = max_length + first_chunk_max_length_adjustment - - if length <= first_chunk_max_length: - return [data] - - if isinstance(data, tuple): - data = data[0] * data[1] - - return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], max_length)] diff --git a/tests/mock_interface.py b/tests/mock_interface.py index fd737bd..178e9f8 100644 --- a/tests/mock_interface.py +++ b/tests/mock_interface.py @@ -7,6 +7,8 @@ class MockInterface(Interface): def __init__(self, responses=[]): self.mock_responses = responses + self.serial = Mock(port='/dev/mock') + self.legacy_firmware_detected = None self.legacy_firmware_version = None diff --git a/tests/test_controller.py b/tests/test_controller.py index 16be1b2..dcef935 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -4,15 +4,15 @@ from unittest.mock import Mock, create_autospec, patch import selectors from selectors import BaseSelector from logging import Logger -from coax import Poll, PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout +from coax import Poll, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout from coax.protocol import TerminalId import context from oec.interface import InterfaceWrapper from oec.controller import Controller -from oec.terminal import Terminal, UnsupportedTerminalError -from oec.display import Dimensions +from oec.device import UnsupportedDeviceError +from oec.terminal import Terminal from oec.keyboard import KeyboardModifiers, Key from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 from oec.session import Session, SessionDisconnectedError @@ -23,22 +23,9 @@ class RunLoopTestCase(unittest.TestCase): def setUp(self): self.interface = MockInterface() - self.session = create_autospec(Session, instance=True) - self.create_session = Mock(return_value=self.session) + self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', { }, KEYMAP_3278_2) - self.controller = Controller(InterfaceWrapper(self.interface), lambda terminal_id, extended_id: KEYMAP_3278_2, self.create_session) - - self.controller.logger = create_autospec(Logger, instance=True) - - self.controller.connected_poll_period = 1 - - self.controller.session_selector = create_autospec(BaseSelector, instance=True) - - self.controller._update_session = Mock() - - self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2) - - self.terminal.setup = Mock() + self.terminal.setup = Mock(Terminal, instance=True) self.terminal.display.write = Mock() self.terminal.display.toggle_cursor_blink = Mock() @@ -46,6 +33,21 @@ class RunLoopTestCase(unittest.TestCase): self.terminal.keyboard.toggle_clicker = Mock() + self.create_device = Mock(return_value=self.terminal) + + self.session = create_autospec(Session, instance=True) + self.create_session = Mock(return_value=self.session) + + self.controller = Controller(InterfaceWrapper(self.interface), self.create_device, self.create_session) + + self.controller.logger = create_autospec(Logger, instance=True) + + self.controller.attached_poll_period = 1 + + self.controller.session_selector = create_autospec(BaseSelector, instance=True) + + self.controller._update_session = Mock(wraps=self.controller._update_session) + patcher = patch('oec.controller.time.perf_counter') self.perf_counter = patcher.start() @@ -54,37 +56,31 @@ class RunLoopTestCase(unittest.TestCase): self.sleep = patcher.start() - patcher = patch('oec.controller.create_terminal') - - self.create_terminal = patcher.start() - - self.create_terminal.return_value = self.terminal - self.addCleanup(patch.stopall) - def test_no_terminal(self): + def test_no_device(self): self._assert_run_loop(0, ReceiveTimeout, False, 0, False) self._assert_run_loop(1, ReceiveTimeout, False, 4, False) - self.assertIsNone(self.controller.terminal) + self.assertIsNone(self.controller.device) self.assertIsNone(self.controller.session) - def test_terminal_attached(self): + def test_device_attached(self): self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) self._assert_run_loop(0, None, False, 0, False) self._assert_run_loop(0.5, None, True, 0.5, False) - self.assertIsNotNone(self.controller.terminal) + self.assertIsNotNone(self.controller.device) self.assertIsNotNone(self.controller.session) self.controller._update_session.assert_called() - def test_unsupported_terminal_attached(self): - self.create_terminal.side_effect = [UnsupportedTerminalError] + def test_unsupported_device_attached(self): + self.create_device.side_effect = [UnsupportedDeviceError] self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self.assertIsNone(self.controller.terminal) + self.assertIsNone(self.controller.device) self.assertIsNone(self.controller.session) def test_keystroke(self): @@ -93,30 +89,34 @@ class RunLoopTestCase(unittest.TestCase): self._assert_run_loop(0, None, False, 0, False) self._assert_run_loop(0.5, None, True, 0.5, False) - self.assertIsNotNone(self.controller.terminal) + self.assertIsNotNone(self.controller.device) self.assertIsNotNone(self.controller.session) self.controller.session.handle_key.assert_called_with(Key.LOWER_A, KeyboardModifiers.NONE, 96) - def test_terminal_detached(self): + def test_device_detatched(self): self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) self._assert_run_loop(0, None, False, 0, False) self._assert_run_loop(0.5, ReceiveTimeout, True, 0.5, False) - self.assertIsNone(self.controller.terminal) + self.assertIsNone(self.controller.device) self.assertIsNone(self.controller.session) self.session.terminate.assert_called() def test_session_disconnected(self): - self.controller._update_session.side_effect = [None, SessionDisconnectedError, None] + selector_key = Mock(fileobj=self.session) + + self.controller.session_selector.select.return_value = [(selector_key, selectors.EVENT_READ)] + + self.session.handle_host = Mock(side_effect=[None, SessionDisconnectedError]) self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) self._assert_run_loop(0, None, False, 0, False) self._assert_run_loop(0.5, None, True, 0.5, False) - self._assert_run_loop(1.5, None, True, 0.5, False) + self._assert_run_loop(1.5, None, True, 3, False) - self.assertIsNotNone(self.controller.terminal) + self.assertIsNotNone(self.controller.device) self.assertIsNotNone(self.controller.session) self.assertEqual(self.create_session.call_count, 2) @@ -174,7 +174,16 @@ class RunLoopTestCase(unittest.TestCase): self.interface.reset_mock() - self.perf_counter.side_effect = [poll_time, poll_time + expected_poll_delay] + def perf_counter(): + nonlocal poll_time + + time = poll_time + + poll_time += expected_update_session + + return time + + self.perf_counter.side_effect = perf_counter self.sleep.reset_mock() diff --git a/tests/test_device.py b/tests/test_device.py new file mode 100644 index 0000000..74f4453 --- /dev/null +++ b/tests/test_device.py @@ -0,0 +1,189 @@ +import unittest +from unittest.mock import Mock, patch + +from logging import Logger +from coax import TerminalType, Feature, ReadAddressCounterHi, ReadAddressCounterLo, ReadTerminalId, ReadExtendedId, ReadFeatureId, ProtocolError +from coax.protocol import TerminalId + +import context + +from oec.interface import InterfaceWrapper +from oec.device import address_commands, format_address, get_ids, get_features, _jumbo_write_split_data + +from mock_interface import MockInterface + +class AddressCommandsTestCase(unittest.TestCase): + def test_single_command(self): + # Arrange + command = ReadAddressCounterHi() + + # Act + result = address_commands(0b111000, command) + + # Assert + self.assertEqual(result, (0b111000, command)) + + def test_multiple_commands(self): + # Arrange + commands = [ReadAddressCounterHi(), ReadAddressCounterLo()] + + # Act + result = address_commands(0b111000, commands) + + # Assert + self.assertEqual(result, [(0b111000, commands[0]), (0b111000, commands[1])]) + +class FormatAddressTestCase(unittest.TestCase): + def setUp(self): + self.interface = MockInterface() + + def test_no_port(self): + self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock') + + def test_multiplexer(self): + with self.assertRaises(NotImplementedError): + format_address(InterfaceWrapper(self.interface), 0b110000) + +class GetIdsTestCase(unittest.TestCase): + def setUp(self): + self.interface = MockInterface() + + patcher = patch('oec.device.logger', autospec=Logger) + + self.logger = patcher.start() + + patcher = patch('oec.device.time.sleep') + + self.sleep = patcher.start() + + self.addCleanup(patch.stopall) + + def test_no_extended_id(self): + # Arrange + self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b11110100))] + + # Act + (terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(terminal_id.type, TerminalType.CUT) + self.assertEqual(terminal_id.model, 2) + self.assertEqual(terminal_id.keyboard, 15) + self.assertIsNone(extended_id) + + def test_extended_id(self): + # Arrange + self.interface.mock_responses = [ + (None, ReadTerminalId, None, TerminalId(0b11110100)), + (None, ReadExtendedId, None, bytes.fromhex('01 02 03 04')) + ] + + # Act + (terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(terminal_id.type, TerminalType.CUT) + self.assertEqual(terminal_id.model, 2) + self.assertEqual(terminal_id.keyboard, 15) + self.assertEqual(extended_id, '01020304') + + def test_extended_id_second_attempt(self): + # Arrange + self.interface.mock_responses = [ + (None, ReadTerminalId, None, TerminalId(0b11110100)), + (None, ReadExtendedId, None, Mock(side_effect=[ProtocolError, bytes.fromhex('01 02 03 04')])) + ] + + # Act + (terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(terminal_id.type, TerminalType.CUT) + self.assertEqual(terminal_id.model, 2) + self.assertEqual(terminal_id.keyboard, 15) + self.assertEqual(extended_id, '01020304') + + self.logger.warning.assert_called() + + def test_extended_id_failed(self): + # Arrange + self.interface.mock_responses = [ + (None, ReadTerminalId, None, TerminalId(0b11110100)), + (None, ReadExtendedId, None, Mock(side_effect=ProtocolError)) + ] + + # Act + (terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(terminal_id.type, TerminalType.CUT) + self.assertEqual(terminal_id.model, 2) + self.assertEqual(terminal_id.keyboard, 15) + self.assertIsNone(extended_id) + + self.logger.warning.assert_called() + +class GetFeaturesTestCase(unittest.TestCase): + def setUp(self): + self.interface = MockInterface() + + def test_no_features(self): + # Act + features = get_features(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(features, { }) + + def test_eab_feature(self): + # Arrange + self.interface.mock_responses = [(None, ReadFeatureId, lambda command: command.feature_address == 7, Feature.EAB.value)] + + # Act + features = get_features(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(features, { Feature.EAB: 7 }) + + def test_override(self): + # Act + with patch.dict('oec.device.os.environ', { 'COAX_FEATURES': 'EAB@7' }): + features = get_features(InterfaceWrapper(self.interface), None) + + # Assert + self.assertEqual(features, { Feature.EAB: 7 }) + +class JumboWriteSplitDataTestCase(unittest.TestCase): + def test_no_split_strategy(self): + for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, None) + + self.assertEqual(len(result), 1) + + self.assertEqual(result[0], data) + + def test_split_strategy_one_chunk(self): + for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, 32) + + self.assertEqual(len(result), 1) + + self.assertEqual(result[0], data) + + def test_split_strategy_two_chunks(self): + for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, 32) + + self.assertEqual(len(result), 2) + self.assertEqual(len(result[0]), 31) + + def test_split_strategy_three_chunks(self): + for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, 32) + + self.assertEqual(len(result), 3) + self.assertEqual(len(result[0]), 31) + self.assertEqual(len(result[1]), 32) diff --git a/tests/test_display.py b/tests/test_display.py index c256efe..5df8429 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -842,25 +842,23 @@ class EncodeStringTestCase(unittest.TestCase): def _create_display(interface): terminal_id = TerminalId(0b11110100) extended_id = 'c1348300' - dimensions = Dimensions(24, 80) features = { } keymap = KEYMAP_3278_2 - terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap) - display = Display(terminal, dimensions, features.get(Feature.EAB)) + display = Display(terminal, terminal.display.dimensions, features.get(Feature.EAB)) return display def _create_buffered_display(interface, has_eab=False): terminal_id = TerminalId(0b11110100) extended_id = 'c1348300' - dimensions = Dimensions(24, 80) features = { Feature.EAB: 7 } if has_eab else { } keymap = KEYMAP_3278_2 - terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap) - buffered_display = BufferedDisplay(terminal, dimensions, features.get(Feature.EAB)) + buffered_display = BufferedDisplay(terminal, terminal.display.dimensions, features.get(Feature.EAB)) return buffered_display diff --git a/tests/test_interface.py b/tests/test_interface.py index 05a4fc6..7aee99f 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -5,7 +5,7 @@ from coax import ReadAddressCounterHi, ReadAddressCounterLo, ProtocolError import context -from oec.interface import InterfaceWrapper, AggregateExecuteError, address_commands +from oec.interface import InterfaceWrapper, AggregateExecuteError from mock_interface import MockInterface @@ -105,24 +105,3 @@ class InterfaceWrapperExecuteTestCase(unittest.TestCase): self.assertIsInstance(error.errors[0], ProtocolError) self.assertEqual(len(error.responses), 2) - -class AddressCommandsTestCase(unittest.TestCase): - def test_single_command(self): - # Arrange - command = ReadAddressCounterHi() - - # Act - result = address_commands(0b111000, command) - - # Assert - self.assertEqual(result, (0b111000, command)) - - def test_multiple_commands(self): - # Arrange - commands = [ReadAddressCounterHi(), ReadAddressCounterLo()] - - # Act - result = address_commands(0b111000, commands) - - # Assert - self.assertEqual(result, [(0b111000, commands[0]), (0b111000, commands[1])]) diff --git a/tests/test_terminal.py b/tests/test_terminal.py index 6403c23..dce62ca 100644 --- a/tests/test_terminal.py +++ b/tests/test_terminal.py @@ -1,17 +1,36 @@ import unittest -from unittest.mock import Mock, create_autospec, patch -from coax import Poll, PollAction, TerminalType, Feature, ReadTerminalId, ReadExtendedId, ReadFeatureId +from unittest.mock import create_autospec +from coax import Poll, PollAction from coax.protocol import TerminalId import context from oec.interface import InterfaceWrapper -from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError, _jumbo_write_split_data -from oec.display import Display, Dimensions +from oec.device import UnsupportedDeviceError +from oec.terminal import Terminal +from oec.display import Display, StatusLine from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 from mock_interface import MockInterface +class InitTerminalTestCase(unittest.TestCase): + def test_supported_terminal_model(self): + # Arrange + terminal_id = TerminalId(0b11110100) + + # Act + Terminal(None, None, terminal_id, None, { }, KEYMAP_3278_2) + + def test_unsupported_terminal_model(self): + # Arrange + terminal_id = TerminalId(0b11110100) + + terminal_id.model = 1 + + # Act and assert + with self.assertRaises(UnsupportedDeviceError): + Terminal(None, None, terminal_id, None, { }, KEYMAP_3278_2) + class TerminalSetupTestCase(unittest.TestCase): def setUp(self): self.interface = MockInterface() @@ -19,6 +38,7 @@ class TerminalSetupTestCase(unittest.TestCase): self.terminal = _create_terminal(self.interface) self.terminal.display = create_autospec(Display, instance=True) + self.terminal.display.status_line = create_autospec(StatusLine, instance=True) def test(self): self.terminal.setup() @@ -70,143 +90,12 @@ class TerminalPollTestCase(unittest.TestCase): def assert_poll_with_poll_action(self, action): self.interface.assert_command_executed(None, Poll, lambda command: command.action == action) -class CreateTerminalTestCase(unittest.TestCase): - def setUp(self): - self.interface = MockInterface() - - self.get_keymap = lambda terminal_id, extended_id: KEYMAP_3278_2 - - def test_supported_terminal_with_no_features(self): - # Arrange - interface = InterfaceWrapper(self.interface) - - self.interface.mock_responses = [ - (None, ReadTerminalId, None, TerminalId(0b11110100)), - (None, ReadExtendedId, None, bytes.fromhex('00 00 00 00')) - ] - - # Act - terminal = create_terminal(interface, None, None, self.get_keymap) - - # Assert - self.assertEqual(terminal.terminal_id.type, TerminalType.CUT) - self.assertEqual(terminal.terminal_id.model, 2) - self.assertEqual(terminal.terminal_id.keyboard, 15) - self.assertEqual(terminal.extended_id, '00000000') - self.assertEqual(terminal.display.dimensions, Dimensions(24, 80)) - self.assertEqual(terminal.features, { }) - self.assertEqual(terminal.keyboard.keymap.name, '3278-2') - - def test_supported_terminal_with_eab_feature(self): - # Arrange - interface = InterfaceWrapper(self.interface) - - self.interface.mock_responses = [ - (None, ReadTerminalId, None, TerminalId(0b11110100)), - (None, ReadExtendedId, None, bytes.fromhex('c1 34 83 00')), - (None, ReadFeatureId, lambda command: command.feature_address == 7, Feature.EAB.value) - ] - - # Act - terminal = create_terminal(interface, None, None, self.get_keymap) - - # Assert - self.assertEqual(terminal.terminal_id.type, TerminalType.CUT) - self.assertEqual(terminal.terminal_id.model, 2) - self.assertEqual(terminal.terminal_id.keyboard, 15) - self.assertEqual(terminal.extended_id, 'c1348300') - self.assertEqual(terminal.display.dimensions, Dimensions(24, 80)) - self.assertEqual(terminal.features, { Feature.EAB: 7 }) - self.assertEqual(terminal.keyboard.keymap.name, '3278-2') - - def test_supported_terminal_features_override(self): - # Arrange - interface = InterfaceWrapper(self.interface) - - self.interface.mock_responses = [ - (None, ReadTerminalId, None, TerminalId(0b11110100)), - (None, ReadExtendedId, None, bytes.fromhex('00 00 00 00')) - ] - - # Act - with patch.dict('oec.terminal.os.environ', { 'COAX_FEATURES': 'EAB@7' }): - terminal = create_terminal(interface, None, None, self.get_keymap) - - # Assert - self.assertEqual(terminal.terminal_id.type, TerminalType.CUT) - self.assertEqual(terminal.terminal_id.model, 2) - self.assertEqual(terminal.terminal_id.keyboard, 15) - self.assertEqual(terminal.extended_id, '00000000') - self.assertEqual(terminal.display.dimensions, Dimensions(24, 80)) - self.assertEqual(terminal.features, { Feature.EAB: 7 }) - self.assertEqual(terminal.keyboard.keymap.name, '3278-2') - - def test_unsupported_terminal_type(self): - # Arrange - interface = InterfaceWrapper(self.interface) - - self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b00000001))] - - # Act and assert - with self.assertRaises(UnsupportedTerminalError): - create_terminal(interface, None, None, self.get_keymap) - - def test_unsupported_terminal_model(self): - # Arrange - interface = InterfaceWrapper(self.interface) - terminal_id = TerminalId(0b11110100) - - terminal_id.model = 1 - - self.interface.mock_responses = [(None, ReadTerminalId, None, terminal_id)] - - # Act and assert - with self.assertRaises(UnsupportedTerminalError): - create_terminal(interface, None, None, self.get_keymap) - -class JumboWriteSplitDataTestCase(unittest.TestCase): - def test_no_split_strategy(self): - for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]: - with self.subTest(data=data): - result = _jumbo_write_split_data(data, None) - - self.assertEqual(len(result), 1) - - self.assertEqual(result[0], data) - - def test_split_strategy_one_chunk(self): - for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]: - with self.subTest(data=data): - result = _jumbo_write_split_data(data, 32) - - self.assertEqual(len(result), 1) - - self.assertEqual(result[0], data) - - def test_split_strategy_two_chunks(self): - for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]: - with self.subTest(data=data): - result = _jumbo_write_split_data(data, 32) - - self.assertEqual(len(result), 2) - self.assertEqual(len(result[0]), 31) - - def test_split_strategy_three_chunks(self): - for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]: - with self.subTest(data=data): - result = _jumbo_write_split_data(data, 32) - - self.assertEqual(len(result), 3) - self.assertEqual(len(result[0]), 31) - self.assertEqual(len(result[1]), 32) - def _create_terminal(interface): terminal_id = TerminalId(0b11110100) extended_id = 'c1348300' - dimensions = Dimensions(24, 80) features = { } keymap = KEYMAP_3278_2 - terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap) return terminal diff --git a/tests/test_tn3270.py b/tests/test_tn3270.py index 40a90d0..6c4935f 100644 --- a/tests/test_tn3270.py +++ b/tests/test_tn3270.py @@ -370,11 +370,10 @@ class SessionRenderTestCase(unittest.TestCase): def _create_terminal(interface): terminal_id = TerminalId(0b11110100) extended_id = 'c1348300' - dimensions = Dimensions(24, 80) features = { } keymap = KEYMAP_3278_2 - terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap) terminal.display.status_line = create_autospec(StatusLine, instance=True) diff --git a/tests/test_vt100.py b/tests/test_vt100.py index 2ee70e0..1a74789 100644 --- a/tests/test_vt100.py +++ b/tests/test_vt100.py @@ -178,10 +178,9 @@ class SessionRenderTestCase(unittest.TestCase): def _create_terminal(interface): terminal_id = TerminalId(0b11110100) extended_id = 'c1348300' - dimensions = Dimensions(24, 80) features = { } keymap = KEYMAP_3278_2 - terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, features, keymap) return terminal